diff --git a/osprey_worker/pyproject.toml b/osprey_worker/pyproject.toml index 588311aa..a0ccb057 100644 --- a/osprey_worker/pyproject.toml +++ b/osprey_worker/pyproject.toml @@ -9,6 +9,7 @@ description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ + "confluent-kafka>=2.14.0", "flask-cors>=6.0.1", "osprey_rpc", ] diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py index 20a790a7..f8d39efd 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py @@ -1,6 +1,5 @@ from typing import List, Sequence -from kafka import KafkaProducer from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend from osprey.worker.adaptor.plugin_manager import hookimpl_osprey from osprey.worker.lib.config import Config @@ -21,8 +20,9 @@ def register_output_sinks(config: Config) -> Sequence[BaseOutputSink]: client_id = config.expect_str('OSPREY_KAFKA_OUTPUT_CLIENT_ID') sinks.append( KafkaOutputSink( - kafka_topic=output_topic, - kafka_producer=KafkaProducer(bootstrap_servers=bootstrap_servers, client_id=client_id), + bootstrap_servers=bootstrap_servers, + output_topic=output_topic, + client_id=client_id, ) ) diff --git a/osprey_worker/src/osprey/worker/cli/sinks.py b/osprey_worker/src/osprey/worker/cli/sinks.py index 13f1af95..3786c893 100644 --- a/osprey_worker/src/osprey/worker/cli/sinks.py +++ b/osprey_worker/src/osprey/worker/cli/sinks.py @@ -23,7 +23,7 @@ import click import gevent -import kafka +from confluent_kafka import Consumer import sentry_sdk from google.api_core.exceptions import AlreadyExists from google.cloud import pubsub_v1 @@ -51,7 +51,6 @@ from osprey.worker.sinks.sink.input_stream import PostgresInputStream from osprey.worker.sinks.sink.osprey_coordinator_input_stream import OspreyCoordinatorInputStream from osprey.worker.sinks.sink.rules_sink import RulesSink -from osprey.worker.sinks.utils.kafka import PatchedKafkaConsumer LOGGER = get_logger() @@ -81,9 +80,23 @@ def tail_kafka_output_sink() -> None: output_topic = config.get_str('OSPREY_KAFKA_OUTPUT_SINK_TOPIC', 'osprey.execution_results') bootstrap_servers = config.get_str_list('OSPREY_KAFKA_BOOTSTRAP_SERVERS', ['localhost']) - kakfa_consumer = kafka.KafkaConsumer(output_topic, bootstrap_servers=bootstrap_servers) - for event in kakfa_consumer: - print(event) + consumer = Consumer({ + 'bootstrap.servers': ','.join(bootstrap_servers), + 'group.id': 'osprey-tail-output', + 'auto.offset.reset': 'latest', + }) + consumer.subscribe([output_topic]) + try: + while True: + msg = consumer.poll(timeout=1.0) + if msg is None: + continue + if msg.error(): + print(f'Error: {msg.error()}') + continue + print(msg.value()) + finally: + consumer.close() @cli.command() @@ -101,9 +114,25 @@ def tail_kafka_input_sink() -> None: client_id = config.get_str('OSPREY_KAFKA_INPUT_STREAM_CLIENT_ID', 'localhost') bootstrap_servers = config.get_str_list('OSPREY_KAFKA_BOOTSTRAP_SERVERS', ['localhost']) input_topic = config.get_str('OSPREY_KAFKA_INPUT_STREAM_TOPIC', 'osprey.actions_input') - kafka_consumer = PatchedKafkaConsumer(input_topic, bootstrap_servers=bootstrap_servers, client_id=client_id) - for event in kafka_consumer: - print(event) + + consumer = Consumer({ + 'bootstrap.servers': ','.join(bootstrap_servers), + 'client.id': client_id, + 'group.id': 'osprey-tail-input', + 'auto.offset.reset': 'latest', + }) + consumer.subscribe([input_topic]) + try: + while True: + msg = consumer.poll(timeout=1.0) + if msg is None: + continue + if msg.error(): + print(f'Error: {msg.error()}') + continue + print(msg.value()) + finally: + consumer.close() @cli.command() diff --git a/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py b/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py index b33c6d1e..6ff4053e 100644 --- a/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py +++ b/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py @@ -3,7 +3,8 @@ from datetime import datetime, timedelta from google.cloud import pubsub_v1 -from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from confluent_kafka import Consumer +from osprey.worker.sinks.utils.kafka import ThreadedKafkaConsumer from osprey.engine.executor.execution_context import Action from osprey.worker.adaptor.plugin_manager import bootstrap_input_stream from osprey.worker.lib.singletons import CONFIG @@ -15,7 +16,6 @@ ) from osprey.worker.sinks.sink.osprey_coordinator_input_stream import OspreyCoordinatorInputStream from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext, NoopAckingContext -from osprey.worker.sinks.utils.kafka import PatchedKafkaConsumer def get_rules_sink_input_stream( @@ -113,17 +113,22 @@ def get_rules_sink_input_stream( if client_id_suffix: client_id = f'{client_id}-{client_id_suffix}' - consumer: PatchedKafkaConsumer = PatchedKafkaConsumer( - input_topic, - bootstrap_servers=input_bootstrap_servers, - client_id=client_id, - group_id=group_id, - partition_assignment_strategy=(RoundRobinPartitionAssignor,), - ) + consumer_config = { + 'bootstrap.servers': ','.join(input_bootstrap_servers), + 'client.id': client_id, + 'group.id': group_id or 'osprey-consumer', + 'partition.assignment.strategy': 'roundrobin', + 'auto.offset.reset': 'latest', + } + + consumer = Consumer(consumer_config) + consumer.subscribe([input_topic]) + threaded_consumer = ThreadedKafkaConsumer(consumer) + from osprey.worker.sinks.sink.input_stream import KafkaInputStream return KafkaInputStream( - kafka_consumer=consumer, + kafka_consumer=threaded_consumer, ) elif input_stream_source == InputStreamSource.PLUGIN: stream = bootstrap_input_stream(config=config) diff --git a/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py b/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py index 55255c1c..ed2ca1ca 100644 --- a/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py +++ b/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py @@ -8,6 +8,7 @@ import gevent import msgpack import sentry_sdk +from confluent_kafka import Message as KafkaMessage from gevent.lock import RLock from gevent.queue import Queue as GeventQueue from google.api_core import retry @@ -16,7 +17,6 @@ from google.protobuf.message import DecodeError from google.protobuf.message import Message as ProtoMessage from google.pubsub_v1 import PubsubMessage -from kafka.consumer.fetcher import ConsumerRecord from osprey.engine.executor.execution_context import Action from osprey.worker.lib.encryption.envelope import Envelope from osprey.worker.lib.instruments import metrics @@ -29,7 +29,6 @@ PubSubMessageAckingContext, PullPubSubMessageContext, ) -from osprey.worker.sinks.utils.kafka import PatchedKafkaConsumer from pydantic import BaseModel from tenacity import RetryCallState, retry_if_exception_type, stop_never, wait_exponential from tenacity import retry as tenacity_retry @@ -43,6 +42,7 @@ if TYPE_CHECKING: from google.cloud.pubsub_v1.types import PullResponse + from osprey.worker.sinks.utils.kafka import ThreadedKafkaConsumer class BaseInputStream(abc.ABC, Generic[_T]): @@ -413,22 +413,27 @@ def claim_with_retry() -> Optional[_ModelT]: class KafkaInputStream(BaseInputStream[BaseAckingContext[Action]]): """An input stream that consumes messages from a Kafka topic and yields Action objects wrapped in an AckingContext.""" - def __init__(self, kafka_consumer: PatchedKafkaConsumer): + def __init__(self, kafka_consumer: 'ThreadedKafkaConsumer'): super().__init__() - self._consumer: PatchedKafkaConsumer = kafka_consumer + self._consumer = kafka_consumer def _gen(self) -> Iterator[BaseAckingContext[Action]]: while True: try: - with metrics.timed('kafka_consumer.lock_time'): - with metrics.timed('kafka_consumer.poll_time'): - record: ConsumerRecord = next(self._consumer) - data = json.loads(record.value) + with metrics.timed('kafka_consumer.poll_time'): + msg: Optional[KafkaMessage] = self._consumer.poll(timeout=1.0) + + if msg is None: + continue + + if msg.error(): + logger.error(f'Kafka consumer error: {msg.error()}') + sentry_sdk.capture_exception(Exception(str(msg.error()))) + continue + + data = json.loads(msg.value()) timestamp = parse_go_timestamp(data['send_time']) action_data = data['data'] - # this was here for when this was protobuf. If its json by default, we should just assume its all in one - # json blob. - # action_data = json.loads(action_data_json) action = Action( action_id=int(action_data['action_id']), @@ -436,7 +441,6 @@ def _gen(self) -> Iterator[BaseAckingContext[Action]]: data=action_data['data'], timestamp=timestamp, ) - # Wrap in NoopAckingContext for now, or implement a KafkaAckingContext if needed yield NoopAckingContext(action) except Exception as e: logger.exception('Error while consuming from Kafka') diff --git a/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py b/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py index 83c0e765..134b6f61 100644 --- a/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py +++ b/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py @@ -1,29 +1,120 @@ +import platform from typing import Any import sentry_sdk -from kafka import KafkaProducer +from confluent_kafka import Producer +from confluent_kafka.admin import AdminClient, NewTopic from osprey.engine.executor.execution_context import ExecutionResult from osprey.worker.lib.osprey_shared.logging import get_logger from osprey.worker.sinks.sink.output_sink import BaseOutputSink +from osprey.worker.sinks.utils.kafka import ThreadedKafkaProducer logger = get_logger() +class EmptyBootstrapServersException(Exception): + """Exception that is raised whenever the server list provided to KafkaOutputSink is empty.""" + + +class InvalidOutputTopicException(Exception): + """Exception that is raised whenever the output topic passed to KafkaOutputSink is empty.""" + + class KafkaOutputSink(BaseOutputSink): """An output sink that sends the extracted features to a given kafka topic.""" - def __init__(self, kafka_topic: str, kafka_producer: KafkaProducer): - self._kafka_topic = kafka_topic - self._kafka_producer = kafka_producer + def __init__( + self, + bootstrap_servers: list[str], + output_topic: str, + client_id: str | None, + ) -> None: + if len(bootstrap_servers) == 0: + raise EmptyBootstrapServersException() + + if output_topic == '': + raise InvalidOutputTopicException() + + self.logger = get_logger('KafkaOutputSink') + + self._bootstrap_servers = bootstrap_servers + self._output_topic = output_topic + + # NOTE(haileyok): this is...not necessary probably + self.topic_ensured = False + + if client_id is None: + client_hostname = platform.node() + if client_hostname != '': + client_id = f'{client_hostname};host_override={bootstrap_servers[0]}' + else: + client_id = f'osprey-output-sink;host_override={bootstrap_servers[0]}' + + self.logger.info(f'Creating Kafka producer with client id {client_id}') + + config = { + 'bootstrap.servers': ','.join(bootstrap_servers), + 'client.id': client_id, + 'queue.buffering.max.messages': 1_000_000, + 'linger.ms': 10, + 'retries': 10, + 'request.timeout.ms': 30_000, + 'socket.timeout.ms': 30_000, + 'delivery.timeout.ms': 120_000, + 'statistics.interval.ms': 10_000, + 'log.connection.close': False, + 'enable.idempotence': True, + 'acks': 'all', + 'max.in.flight.requests.per.connection': 5, + 'message.max.bytes': 20_000_000, + } + + self._producer = Producer(config) + self._threaded_producer = ThreadedKafkaProducer(self._producer) + self.ensure_topic() + + super().__init__() + + def ensure_topic(self) -> None: + """Create the Kafka topic if it does not yet exist.""" + admin_client = AdminClient({'bootstrap.servers': ','.join(self._bootstrap_servers)}) + + try: + metadata = admin_client.list_topics(timeout=10) + except Exception as e: + self.logger.error(f'Error listing topics, unable to ensure topic: {e}') + return + + if self._output_topic in metadata.topics: + self.topic_ensured = True + return + + self.logger.info(f'Creating topic {self._output_topic}') + try: + # TODO: num_partitions and replication_factor should not be hard coded... + topic = NewTopic(self._output_topic, num_partitions=3, replication_factor=3) + fs = admin_client.create_topics([topic]) + fs[self._output_topic].result() + self.topic_ensured = True + except Exception as e: + self.logger.error(f'Error creating topic, unable to ensure topic: {e}') def will_do_work(self, result: ExecutionResult) -> bool: return True def push(self, result: ExecutionResult) -> None: - kafka_future: Any = self._kafka_producer.send( - topic=self._kafka_topic, value=result.extracted_features_json.encode('utf-8') + self._threaded_producer.produce( + self._output_topic, + value=result.extracted_features_json.encode('utf-8'), + on_delivery=self._on_delivery, ) - kafka_future.add_errback(self.push_err_to_sentry) + + def _on_delivery(self, err: Any, msg: Any) -> None: + if err is not None: + self.push_err_to_sentry(err) + + def flush(self, timeout: float = 30) -> int: + return self._threaded_producer.flush(timeout) @classmethod def push_err_to_sentry(cls, e: Exception) -> None: @@ -31,4 +122,6 @@ def push_err_to_sentry(cls, e: Exception) -> None: sentry_sdk.capture_exception(error=e) def stop(self) -> None: - pass + remaining = self._threaded_producer.close(timeout=10) + if remaining > 0: + logger.warning(f'{remaining} messages were not delivered') diff --git a/osprey_worker/src/osprey/worker/sinks/utils/gevent.py b/osprey_worker/src/osprey/worker/sinks/utils/gevent.py deleted file mode 100644 index 08a38259..00000000 --- a/osprey_worker/src/osprey/worker/sinks/utils/gevent.py +++ /dev/null @@ -1,17 +0,0 @@ -from typing import Optional - -from gevent import sleep -from gevent.lock import RLock - - -class FairRLock(RLock): # type: ignore - """A RLock that is fair, and will allow lock acquisitions that were attempted first to be acquired, - which is distinct from the way that RLock works in gevent, which will allow the greenlet that just released - the lock to re-acquire it, even if there is another greenlet waiting to acquire the lock, thus starving all - other lockers.""" - - def acquire(self, blocking: bool = True, timeout: Optional[float] = None) -> bool: - if blocking and not self._block.locked() and self._block.linkcount(): - sleep(0) - - return super().acquire(blocking=blocking, timeout=timeout) diff --git a/osprey_worker/src/osprey/worker/sinks/utils/kafka.py b/osprey_worker/src/osprey/worker/sinks/utils/kafka.py index 92e5a2d7..394c8c7d 100644 --- a/osprey_worker/src/osprey/worker/sinks/utils/kafka.py +++ b/osprey_worker/src/osprey/worker/sinks/utils/kafka.py @@ -1,13 +1,100 @@ -from typing import Any +"""Threaded wrappers for confluent-kafka that keep librdkafka's C threads off the gevent event loop. -from kafka import KafkaConsumer +confluent-kafka uses librdkafka under the hood. Its internal threads and network I/O bypass +gevent's monkey-patching entirely, which means they can hold the GIL and starve greenlets. These +wrappers run all confluent-kafka operations on dedicated OS threads instead of greenlets, then +bridge back to greenlet-land via gevent async watchers. +""" -from .gevent import FairRLock +from typing import Any, Optional +import gevent._threading as _real_threading +import gevent.queue +import sentry_sdk +from confluent_kafka import Consumer, KafkaException, Message, Producer +from osprey.worker.lib.osprey_shared.logging import get_logger -class PatchedKafkaConsumer(KafkaConsumer): # type: ignore - """A KafkaConsnsumer that has had its lock patched to FairRLock, that prevents deadlocking.""" +logger = get_logger() - def __init__(self, *args: Any, **kwargs: Any) -> None: - super().__init__(*args, **kwargs) - self._client._lock = FairRLock() +_SENTINEL = object() + +import gevent.monkey + +_real_sleep = gevent.monkey.get_original('time', 'sleep') + + +class ThreadedKafkaConsumer: + """Wraps a confluent-kafka Consumer, polling on a real OS thread and delivering messages via a gevent queue.""" + + def __init__(self, consumer: Consumer, queue_maxsize: int = 1000) -> None: + self._consumer = consumer + self._queue: gevent.queue.Queue[Optional[Message]] = gevent.queue.Queue(maxsize=queue_maxsize) + self._running = True + self._thread_ident = _real_threading.start_new_thread(self._poll_loop, ()) + + def _poll_loop(self) -> None: + while self._running: + try: + msg = self._consumer.poll(timeout=1.0) + if msg is not None: + self._queue.put(msg) + except KafkaException as e: + logger.error(f'Kafka consumer poll error: {e}') + sentry_sdk.capture_exception(e) + + def poll(self, timeout: Optional[float] = None) -> Optional[Message]: + """Get the next message from the gevent queue. Safe to call from a greenlet.""" + try: + return self._queue.get(timeout=timeout) + except gevent.queue.Empty: + return None + + def close(self) -> None: + self._running = False + self._consumer.close() + + +class ThreadedKafkaProducer: + """Wraps a confluent-kafka Producer, running produce/poll on a real OS thread.""" + + def __init__(self, producer: Producer, poll_interval: float = 0.1) -> None: + self._producer = producer + self._queue: gevent.queue.Queue = gevent.queue.Queue() + self._poll_interval = poll_interval + self._running = True + self._thread_ident = _real_threading.start_new_thread(self._produce_loop, ()) + + def _produce_loop(self) -> None: + while self._running: + self._producer.poll(0) + + try: + item = self._queue.get_nowait() + except gevent.queue.Empty: + _real_sleep(self._poll_interval) + continue + + if item is _SENTINEL: + break + + topic, value, on_delivery = item + try: + self._producer.produce(topic, value=value, on_delivery=on_delivery) + except BufferError: + logger.warning('Producer queue full, flushing before retry') + self._producer.flush(timeout=5) + self._producer.produce(topic, value=value, on_delivery=on_delivery) + + def produce(self, topic: str, value: bytes, on_delivery: Any = None) -> None: + """Queue a message for production. Safe to call from a greenlet.""" + self._queue.put((topic, value, on_delivery)) + + def flush(self, timeout: float = 30) -> int: + """Flush pending messages. Blocks until complete.""" + return self._producer.flush(timeout) + + def close(self, timeout: float = 10) -> int: + """Stop the producer thread and flush remaining messages.""" + self._running = False + self._queue.put(_SENTINEL) + return self._producer.flush(timeout) diff --git a/pyproject.toml b/pyproject.toml index 503445bb..84f241fb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,7 +46,7 @@ common = [ "intervals==0.9.2", "jslog4kube==1.0.6", "jsonpath-rw", - "kafka-python==1.4.7", + "confluent-kafka>=2.14.0", "minio>=7.2.16", "mmh3==3.0.0", "msgpack==1.0.8", @@ -302,7 +302,7 @@ module = [ "deepmerge.*", "greenlet.*", "Levenshtein.*", - "kafka.*", + "confluent_kafka.*", "pythonjsonlogger.*", "msgpack.*", "jose.*", diff --git a/uv.lock b/uv.lock index d410aafb..166a895e 100644 --- a/uv.lock +++ b/uv.lock @@ -31,6 +31,7 @@ overrides = [{ name = "ddtrace", specifier = ">=2.17.2,<3.0.0" }] common = [ { name = "blinker", specifier = "==1.9.0" }, { name = "click", specifier = "==7.1.2" }, + { name = "confluent-kafka", specifier = ">=2.14.0" }, { name = "datadog", specifier = "==0.51.0" }, { name = "ddtrace", specifier = "==2.21.11" }, { name = "deepmerge", specifier = "==0.3.0" }, @@ -71,7 +72,6 @@ common = [ { name = "intervals", specifier = "==0.9.2" }, { name = "jslog4kube", specifier = "==1.0.6" }, { name = "jsonpath-rw", url = "https://github.com/kennknowles/python-jsonpath-rw/archive/6f5647bb3ad2395c20f0191fef07a1df51c9fed8.tar.gz" }, - { name = "kafka-python", specifier = "==1.4.7" }, { name = "minio", specifier = ">=7.2.16" }, { name = "mmh3", specifier = "==3.0.0" }, { name = "msgpack", specifier = "==1.0.8" }, @@ -384,6 +384,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, ] +[[package]] +name = "confluent-kafka" +version = "2.14.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/40/52/2c71d8e0b2de51076f90cea05342dc9c20fa14ded11992827680db4bbdfa/confluent_kafka-2.14.0.tar.gz", hash = "sha256:34efddfd06766d1153d10a70c23a98f6035e253a906db8ed04cb0249fc3b0fd2", size = 287868, upload-time = "2026-04-02T11:28:57.862Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/6c/87/ae316df6411e87c14acf9d83bff12582a0f45dea76df1b5d2a623701d389/confluent_kafka-2.14.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:21c5ce6f388a5bd5d8f102026250faa528694cf7cf71fb6a1b321dad27874c1c", size = 3645064, upload-time = "2026-04-02T11:27:56.259Z" }, + { url = "https://files.pythonhosted.org/packages/41/5f/b154300af17a3f03a8fb071d9e3ec857d60b27191560f323b750966272ba/confluent_kafka-2.14.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2eaeabe10c44f8b4c0b879602e804548f2c49eae9e5d40a6d88e372c4c876810", size = 3203249, upload-time = "2026-04-02T11:27:58.556Z" }, + { url = "https://files.pythonhosted.org/packages/a2/5e/2d051982b3097e5bee61e2b38d1c95079db78b6c13bafe750d159fcfe6d1/confluent_kafka-2.14.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:163f30cfe49f56d85d208ddf6db50a64cb4156d85ff0d5925cbf53c9c1ff5229", size = 3734511, upload-time = "2026-04-02T11:28:00.939Z" }, + { url = "https://files.pythonhosted.org/packages/05/fd/17f9982fc7ba905c2f667a6a5b8ce8fd032d42c97235235d35ccee172b70/confluent_kafka-2.14.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:c86583d81b71c097e7fb9b2b7e45aa7e3f43cf23308319e0d6bd6eede3027732", size = 3991298, upload-time = "2026-04-02T11:28:03.375Z" }, + { url = "https://files.pythonhosted.org/packages/3d/16/069b1d090fa76a62dc3c897f2b081f42208cc8ee3c7dfa75d8e0852c9935/confluent_kafka-2.14.0-cp311-cp311-win_amd64.whl", hash = "sha256:74a9af680de1aab2a701c3a4645275374f556cabbf559fa569450fe4f8c61e25", size = 4111893, upload-time = "2026-04-02T11:28:05.799Z" }, + { url = "https://files.pythonhosted.org/packages/12/05/f27091396c1e5fb98844e3e8b114ec7b896d1b54209e796e3946649de2cd/confluent_kafka-2.14.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:737b63f2389c9d63f3da0923681aa95abad1cb2f96b10f38192ef19ab727c883", size = 3650743, upload-time = "2026-04-02T11:28:07.697Z" }, + { url = "https://files.pythonhosted.org/packages/9e/49/b9de672412c4290b4719f99ac17b31ff35c64b221e4961a3047f6c1f334f/confluent_kafka-2.14.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:1610aa31880c874bfa3351d898d6e6cdbfab2a0f9443598fd64425bbc815cb06", size = 3207894, upload-time = "2026-04-02T11:28:09.813Z" }, + { url = "https://files.pythonhosted.org/packages/fb/b6/d892b50a48bbd95e8937d557baf89ffa07fc48bc27f792141476a004334d/confluent_kafka-2.14.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:9cca8929bbc3d68a3299b21239c48def860f04e4661c7a59efe3104ecaea0e08", size = 3739440, upload-time = "2026-04-02T11:28:11.595Z" }, + { url = "https://files.pythonhosted.org/packages/f2/27/04d0f106820219e2621cf9e9a3ab49e910b7a19e55a72a21768b82031a85/confluent_kafka-2.14.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:4d2e4718371c06579f649835239d1acf6ab5386a88f70e9cb9b839855c83c4a9", size = 3995763, upload-time = "2026-04-02T11:28:14.46Z" }, + { url = "https://files.pythonhosted.org/packages/64/d9/46258cefee841d65dda31d20ce61d12f7573e07ef8d26f49169edfd0b0fa/confluent_kafka-2.14.0-cp312-cp312-win_amd64.whl", hash = "sha256:c37aff51512e817316edd6eafa8a2e59745052a7d1e61e09931b1caa11803266", size = 4112399, upload-time = "2026-04-02T11:28:16.264Z" }, + { url = "https://files.pythonhosted.org/packages/26/a3/13ca4b42c580cb8e8d4bc0711467c7c501573f0133dcaf1ed6d7e34abb42/confluent_kafka-2.14.0-cp313-cp313-macosx_13_0_arm64.whl", hash = "sha256:a6dc0e49e8ac99854bd89ec7ac16c54af4488c7617baa633e615320dfbe44b25", size = 3212698, upload-time = "2026-04-02T11:28:18.351Z" }, + { url = "https://files.pythonhosted.org/packages/27/f6/3b4744a8d1b7714500e830a615671d27f76bf64c15966740cc6ee1c960f7/confluent_kafka-2.14.0-cp313-cp313-macosx_13_0_x86_64.whl", hash = "sha256:308c972b23f44e4d0eb3e76b987872c9a7d04148a5a4f29313bbbec3841d75b4", size = 3654148, upload-time = "2026-04-02T11:28:20.532Z" }, + { url = "https://files.pythonhosted.org/packages/48/9b/928775785983a2840c1944a689308e346badb2475765030f8e2a0db21f7a/confluent_kafka-2.14.0-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:9b0acf2fffa19a6ffc2d6f0b82f3b7f1771f5d3943312438f3532ae69b6f2e83", size = 3739774, upload-time = "2026-04-02T11:28:22.283Z" }, + { url = "https://files.pythonhosted.org/packages/c7/37/c2d7a24f0c12673c763b25c2b32defe3b47b8458ad54befd842b6a3a0cde/confluent_kafka-2.14.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:0023a941dbd8a2325e9e0d13ed1b2236c7d4ff3279b3d99cf06cf1409ab26d22", size = 3996169, upload-time = "2026-04-02T11:28:24.639Z" }, + { url = "https://files.pythonhosted.org/packages/be/fe/4c2e517a404110adbb5b560dafb5d0b3ba36c2af47d52b5508c90f65d5b0/confluent_kafka-2.14.0-cp313-cp313-win_amd64.whl", hash = "sha256:3da898df3ebb866f61312365e9108cbadcfe74fb73af8d03add856542e715cfe", size = 4172080, upload-time = "2026-04-02T11:28:26.801Z" }, + { url = "https://files.pythonhosted.org/packages/f8/07/e217beea9a543c53484144164db337b33ec7f95912cc76f09f03fbc6ee7f/confluent_kafka-2.14.0-cp314-cp314-macosx_13_0_arm64.whl", hash = "sha256:05bbf9745cadb1a6fd3b03508572d2cd5455d8d9960a437537ddac9d3f89ee49", size = 3212541, upload-time = "2026-04-02T11:28:28.882Z" }, + { url = "https://files.pythonhosted.org/packages/5c/73/cbb44df7afa3ac8746e0ebc37be5f457d0e91e32648c144226da26c5f682/confluent_kafka-2.14.0-cp314-cp314-macosx_13_0_x86_64.whl", hash = "sha256:32a72ff85d7b4428532aa477b8dfa4223a5c69f4e90fecaa64e1924cc99a06b6", size = 3653993, upload-time = "2026-04-02T11:28:31.042Z" }, + { url = "https://files.pythonhosted.org/packages/ae/49/49d9e62ff70a06e68c96dd65d8e621583e6b51682ccc08051ec585bfdf96/confluent_kafka-2.14.0-cp314-cp314-manylinux_2_28_aarch64.whl", hash = "sha256:4fd75d53e0e36f7ff9c5454f7a3cf4a54790db3bfda169c3b582ddc97111f6f6", size = 3739535, upload-time = "2026-04-02T11:28:32.844Z" }, + { url = "https://files.pythonhosted.org/packages/33/6a/df467787418c24e063ed0c19e96aedf05c26eabc32d8adc75235d45d830b/confluent_kafka-2.14.0-cp314-cp314-manylinux_2_28_x86_64.whl", hash = "sha256:eb17528ec7b177ec5e38214852f3dadb5d77172e0fb25c7c992c0cbc3dcfbaa2", size = 3995845, upload-time = "2026-04-02T11:28:34.538Z" }, + { url = "https://files.pythonhosted.org/packages/f0/0a/c5ce2a48ece0ae2dd050ab28d4cd81b9efc610276a4e72f622582f5371d3/confluent_kafka-2.14.0-cp314-cp314-win_amd64.whl", hash = "sha256:578afb532ded604cb98174a14a88847367191bcbe4f52a1661f5238dc5cf75dd", size = 4290326, upload-time = "2026-04-02T11:28:36.679Z" }, +] + [[package]] name = "datadog" version = "0.51.0" @@ -1301,15 +1329,6 @@ requires-dist = [ { name = "six" }, ] -[[package]] -name = "kafka-python" -version = "1.4.7" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/58/20/2a2494fe7b4bf4cb36c060f88a55adcd944b7876877691a2f28cd544467b/kafka-python-1.4.7.tar.gz", hash = "sha256:2f29baad4b3efe05a2bb81ac268855aa01cbc68397f15bac77b494ffd7e2cada", size = 290977, upload-time = "2019-09-30T21:13:20.078Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/49/c9/9863483a1353700ba87821b4f39085eb18fd1bcbb1e954c697177d67f03f/kafka_python-1.4.7-py2.py3-none-any.whl", hash = "sha256:4fbebebfcb6fc94903fb720fe883d7bbec7298f4f1acb857c21dd3b4b114ba4b", size = 266121, upload-time = "2019-09-30T21:13:17.697Z" }, -] - [[package]] name = "legacy-cgi" version = "2.6.4" @@ -1577,12 +1596,14 @@ name = "osprey-worker" version = "0.1.0" source = { editable = "osprey_worker" } dependencies = [ + { name = "confluent-kafka" }, { name = "flask-cors" }, { name = "osprey-rpc" }, ] [package.metadata] requires-dist = [ + { name = "confluent-kafka", specifier = ">=2.14.0" }, { name = "flask-cors", specifier = ">=6.0.1" }, { name = "osprey-rpc", editable = "osprey_rpc" }, ]