From ef703cae324323e43d384c99ae074413911f119a Mon Sep 17 00:00:00 2001 From: hailey Date: Sun, 12 Apr 2026 22:40:49 +0000 Subject: [PATCH 01/10] refactor output sinks to use confluent kafka --- osprey_worker/pyproject.toml | 1 + .../worker/sinks/sink/kafka_output_sink.py | 129 ++++++++++++++++-- uv.lock | 40 +++++- 3 files changed, 157 insertions(+), 13 deletions(-) diff --git a/osprey_worker/pyproject.toml b/osprey_worker/pyproject.toml index 588311aa..a0ccb057 100644 --- a/osprey_worker/pyproject.toml +++ b/osprey_worker/pyproject.toml @@ -9,6 +9,7 @@ description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ + "confluent-kafka>=2.14.0", "flask-cors>=6.0.1", "osprey_rpc", ] diff --git a/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py b/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py index 83c0e765..a2a9b370 100644 --- a/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py +++ b/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py @@ -1,7 +1,9 @@ +import platform from typing import Any import sentry_sdk -from kafka import KafkaProducer +from confluent_kafka import Producer +from confluent_kafka.admin import AdminClient, NewTopic from osprey.engine.executor.execution_context import ExecutionResult from osprey.worker.lib.osprey_shared.logging import get_logger from osprey.worker.sinks.sink.output_sink import BaseOutputSink @@ -9,21 +11,128 @@ logger = get_logger() +class EmptyBootstrapServersException(Exception): + """Exception that is raised whenever the server list provided to KafkaOutputSink is empty.""" + + +class InvalidOutputTopicException(Exception): + """Exception that is raised whenever the output topic passed to KafkaOutputSink is empty.""" + + class KafkaOutputSink(BaseOutputSink): """An output sink that sends the extracted features to a given kafka topic.""" - def __init__(self, kafka_topic: str, kafka_producer: KafkaProducer): - self._kafka_topic = kafka_topic - self._kafka_producer = kafka_producer + def __init__( + self, + bootstrap_servers: list[str], + output_topic: str, + client_id: str | None, + poll_every: int = 20, + ) -> None: + if len(bootstrap_servers) == 0: + raise EmptyBootstrapServersException() + + if output_topic == '': + raise InvalidOutputTopicException() + + self.logger = get_logger('KafkaOutputSink') + + self._bootstrap_servers = bootstrap_servers + self._output_topic = output_topic + self._poll_every = poll_every + self._message_count = 0 + + # NOTE(haileyok): this is...not necessary probably + self.topic_ensured = False + + if client_id is None: + client_hostname = platform.node() + if client_hostname != '': + client_id = f'{client_hostname};host_override={bootstrap_servers[0]}' + else: + client_id = f'osprey-output-sink;host_override={bootstrap_servers[0]}' + + self.logger.info(f'Creating Kafka producer with client id {client_id}') + + config = { + 'bootstrap.servers': ','.join(bootstrap_servers), + 'client.id': client_id, + 'queue.buffering.max.messages': 1_000_000, + 'linger.ms': 10, + 'retries': 10, + 'request.timeout.ms': 30_000, + 'socket.timeout.ms': 30_000, + 'delivery.timeout.ms': 120_000, + 'statistics.interval.ms': 10_000, + 'log.connection.close': False, + 'enable.idempotence': True, + 'acks': 'all', + 'max.in.flight.requests.per.connection': 5, + 'max.message.bytes': 20_000_000, + } + + self.producer = Producer(config) + self.ensure_topic() + + super().__init__() + + def ensure_topic(self) -> None: + """Create the Kafka topic if it does not yet exist.""" + admin_client = AdminClient({'bootstrap.servers': ','.join(self._bootstrap_servers)}) + + try: + metadata = admin_client.list_topics(timeout=10) + except Exception as e: + self.logger.error(f'Error listing topics, unable to ensure topic: {e}') + return + + if self._output_topic in metadata.topics: + self.topic_ensured = True + return + + self.logger.info(f'Creating topic {self._output_topic}') + try: + # TODO: num_partitions and replication_factor should not be hard coded... + topic = NewTopic(self._output_topic, num_partitions=3, replication_factor=3) + fs = admin_client.create_topics([topic]) + fs[self._output_topic].result() + self.topic_ensured = True + except Exception as e: + self.logger.error(f'Error creating topic, unable to ensure topic: {e}') + + def _handle_polling(self) -> None: + """Poll periodically based on message count""" + self._message_count += 1 + if self._message_count % self._poll_every == 0: + self.producer.poll(0) def will_do_work(self, result: ExecutionResult) -> bool: return True def push(self, result: ExecutionResult) -> None: - kafka_future: Any = self._kafka_producer.send( - topic=self._kafka_topic, value=result.extracted_features_json.encode('utf-8') - ) - kafka_future.add_errback(self.push_err_to_sentry) + try: + self.producer.produce( + self._output_topic, + value=result.extracted_features_json.encode('utf-8'), + on_delivery=self._on_delivery, + ) + except BufferError: + self.logger.warning('Producer queue full, flushing before retry') + self.producer.flush(timeout=5) + self.producer.produce( + self._output_topic, + value=result.extracted_features_json.encode('utf-8'), + on_delivery=self._on_delivery, + ) + + self._handle_polling() + + def _on_delivery(self, err: Any, msg: Any) -> None: + if err is not None: + self.push_err_to_sentry(err) + + def flush(self, timeout: float = 30) -> int: + return self.producer.flush(timeout) @classmethod def push_err_to_sentry(cls, e: Exception) -> None: @@ -31,4 +140,6 @@ def push_err_to_sentry(cls, e: Exception) -> None: sentry_sdk.capture_exception(error=e) def stop(self) -> None: - pass + remaining = self.flush(timeout=10) + if remaining > 0: + logger.warning(f'{remaining} messages were not delivered') diff --git a/uv.lock b/uv.lock index 70e207bf..212fb5d8 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.11" resolution-markers = [ "python_full_version >= '4' and platform_machine == 'x86_64'", @@ -383,6 +383,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, ] +[[package]] +name = "confluent-kafka" +version = "2.14.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/40/52/2c71d8e0b2de51076f90cea05342dc9c20fa14ded11992827680db4bbdfa/confluent_kafka-2.14.0.tar.gz", hash = "sha256:34efddfd06766d1153d10a70c23a98f6035e253a906db8ed04cb0249fc3b0fd2", size = 287868, upload-time = "2026-04-02T11:28:57.862Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/6c/87/ae316df6411e87c14acf9d83bff12582a0f45dea76df1b5d2a623701d389/confluent_kafka-2.14.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:21c5ce6f388a5bd5d8f102026250faa528694cf7cf71fb6a1b321dad27874c1c", size = 3645064, upload-time = "2026-04-02T11:27:56.259Z" }, + { url = "https://files.pythonhosted.org/packages/41/5f/b154300af17a3f03a8fb071d9e3ec857d60b27191560f323b750966272ba/confluent_kafka-2.14.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2eaeabe10c44f8b4c0b879602e804548f2c49eae9e5d40a6d88e372c4c876810", size = 3203249, upload-time = "2026-04-02T11:27:58.556Z" }, + { url = "https://files.pythonhosted.org/packages/a2/5e/2d051982b3097e5bee61e2b38d1c95079db78b6c13bafe750d159fcfe6d1/confluent_kafka-2.14.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:163f30cfe49f56d85d208ddf6db50a64cb4156d85ff0d5925cbf53c9c1ff5229", size = 3734511, upload-time = "2026-04-02T11:28:00.939Z" }, + { url = "https://files.pythonhosted.org/packages/05/fd/17f9982fc7ba905c2f667a6a5b8ce8fd032d42c97235235d35ccee172b70/confluent_kafka-2.14.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:c86583d81b71c097e7fb9b2b7e45aa7e3f43cf23308319e0d6bd6eede3027732", size = 3991298, upload-time = "2026-04-02T11:28:03.375Z" }, + { url = "https://files.pythonhosted.org/packages/3d/16/069b1d090fa76a62dc3c897f2b081f42208cc8ee3c7dfa75d8e0852c9935/confluent_kafka-2.14.0-cp311-cp311-win_amd64.whl", hash = "sha256:74a9af680de1aab2a701c3a4645275374f556cabbf559fa569450fe4f8c61e25", size = 4111893, upload-time = "2026-04-02T11:28:05.799Z" }, + { url = "https://files.pythonhosted.org/packages/12/05/f27091396c1e5fb98844e3e8b114ec7b896d1b54209e796e3946649de2cd/confluent_kafka-2.14.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:737b63f2389c9d63f3da0923681aa95abad1cb2f96b10f38192ef19ab727c883", size = 3650743, upload-time = "2026-04-02T11:28:07.697Z" }, + { url = "https://files.pythonhosted.org/packages/9e/49/b9de672412c4290b4719f99ac17b31ff35c64b221e4961a3047f6c1f334f/confluent_kafka-2.14.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:1610aa31880c874bfa3351d898d6e6cdbfab2a0f9443598fd64425bbc815cb06", size = 3207894, upload-time = "2026-04-02T11:28:09.813Z" }, + { url = "https://files.pythonhosted.org/packages/fb/b6/d892b50a48bbd95e8937d557baf89ffa07fc48bc27f792141476a004334d/confluent_kafka-2.14.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:9cca8929bbc3d68a3299b21239c48def860f04e4661c7a59efe3104ecaea0e08", size = 3739440, upload-time = "2026-04-02T11:28:11.595Z" }, + { url = "https://files.pythonhosted.org/packages/f2/27/04d0f106820219e2621cf9e9a3ab49e910b7a19e55a72a21768b82031a85/confluent_kafka-2.14.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:4d2e4718371c06579f649835239d1acf6ab5386a88f70e9cb9b839855c83c4a9", size = 3995763, upload-time = "2026-04-02T11:28:14.46Z" }, + { url = "https://files.pythonhosted.org/packages/64/d9/46258cefee841d65dda31d20ce61d12f7573e07ef8d26f49169edfd0b0fa/confluent_kafka-2.14.0-cp312-cp312-win_amd64.whl", hash = "sha256:c37aff51512e817316edd6eafa8a2e59745052a7d1e61e09931b1caa11803266", size = 4112399, upload-time = "2026-04-02T11:28:16.264Z" }, + { url = "https://files.pythonhosted.org/packages/26/a3/13ca4b42c580cb8e8d4bc0711467c7c501573f0133dcaf1ed6d7e34abb42/confluent_kafka-2.14.0-cp313-cp313-macosx_13_0_arm64.whl", hash = "sha256:a6dc0e49e8ac99854bd89ec7ac16c54af4488c7617baa633e615320dfbe44b25", size = 3212698, upload-time = "2026-04-02T11:28:18.351Z" }, + { url = "https://files.pythonhosted.org/packages/27/f6/3b4744a8d1b7714500e830a615671d27f76bf64c15966740cc6ee1c960f7/confluent_kafka-2.14.0-cp313-cp313-macosx_13_0_x86_64.whl", hash = "sha256:308c972b23f44e4d0eb3e76b987872c9a7d04148a5a4f29313bbbec3841d75b4", size = 3654148, upload-time = "2026-04-02T11:28:20.532Z" }, + { url = "https://files.pythonhosted.org/packages/48/9b/928775785983a2840c1944a689308e346badb2475765030f8e2a0db21f7a/confluent_kafka-2.14.0-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:9b0acf2fffa19a6ffc2d6f0b82f3b7f1771f5d3943312438f3532ae69b6f2e83", size = 3739774, upload-time = "2026-04-02T11:28:22.283Z" }, + { url = "https://files.pythonhosted.org/packages/c7/37/c2d7a24f0c12673c763b25c2b32defe3b47b8458ad54befd842b6a3a0cde/confluent_kafka-2.14.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:0023a941dbd8a2325e9e0d13ed1b2236c7d4ff3279b3d99cf06cf1409ab26d22", size = 3996169, upload-time = "2026-04-02T11:28:24.639Z" }, + { url = "https://files.pythonhosted.org/packages/be/fe/4c2e517a404110adbb5b560dafb5d0b3ba36c2af47d52b5508c90f65d5b0/confluent_kafka-2.14.0-cp313-cp313-win_amd64.whl", hash = "sha256:3da898df3ebb866f61312365e9108cbadcfe74fb73af8d03add856542e715cfe", size = 4172080, upload-time = "2026-04-02T11:28:26.801Z" }, + { url = "https://files.pythonhosted.org/packages/f8/07/e217beea9a543c53484144164db337b33ec7f95912cc76f09f03fbc6ee7f/confluent_kafka-2.14.0-cp314-cp314-macosx_13_0_arm64.whl", hash = "sha256:05bbf9745cadb1a6fd3b03508572d2cd5455d8d9960a437537ddac9d3f89ee49", size = 3212541, upload-time = "2026-04-02T11:28:28.882Z" }, + { url = "https://files.pythonhosted.org/packages/5c/73/cbb44df7afa3ac8746e0ebc37be5f457d0e91e32648c144226da26c5f682/confluent_kafka-2.14.0-cp314-cp314-macosx_13_0_x86_64.whl", hash = "sha256:32a72ff85d7b4428532aa477b8dfa4223a5c69f4e90fecaa64e1924cc99a06b6", size = 3653993, upload-time = "2026-04-02T11:28:31.042Z" }, + { url = "https://files.pythonhosted.org/packages/ae/49/49d9e62ff70a06e68c96dd65d8e621583e6b51682ccc08051ec585bfdf96/confluent_kafka-2.14.0-cp314-cp314-manylinux_2_28_aarch64.whl", hash = "sha256:4fd75d53e0e36f7ff9c5454f7a3cf4a54790db3bfda169c3b582ddc97111f6f6", size = 3739535, upload-time = "2026-04-02T11:28:32.844Z" }, + { url = "https://files.pythonhosted.org/packages/33/6a/df467787418c24e063ed0c19e96aedf05c26eabc32d8adc75235d45d830b/confluent_kafka-2.14.0-cp314-cp314-manylinux_2_28_x86_64.whl", hash = "sha256:eb17528ec7b177ec5e38214852f3dadb5d77172e0fb25c7c992c0cbc3dcfbaa2", size = 3995845, upload-time = "2026-04-02T11:28:34.538Z" }, + { url = "https://files.pythonhosted.org/packages/f0/0a/c5ce2a48ece0ae2dd050ab28d4cd81b9efc610276a4e72f622582f5371d3/confluent_kafka-2.14.0-cp314-cp314-win_amd64.whl", hash = "sha256:578afb532ded604cb98174a14a88847367191bcbe4f52a1661f5238dc5cf75dd", size = 4290326, upload-time = "2026-04-02T11:28:36.679Z" }, +] + [[package]] name = "datadog" version = "0.51.0" @@ -497,7 +525,7 @@ dependencies = [ [[package]] name = "example-plugins" version = "0.1.0" -source = { editable = "example_plugins" } +source = { virtual = "example_plugins" } dependencies = [ { name = "pluggy" }, ] @@ -924,6 +952,7 @@ dependencies = [ ] sdist = { url = "https://files.pythonhosted.org/packages/a3/1c/c42834d4fee45c5cf2d9546e97e879a1cbcdecfd16eb1a12144dcb91edae/grpcio-1.49.1.tar.gz", hash = "sha256:d4725fc9ec8e8822906ae26bb26f5546891aa7fbc3443de970cc556d43a5c99f", size = 22059239, upload-time = "2022-09-22T03:02:44.376Z" } wheels = [ + { url = "https://files.pythonhosted.org/packages/2d/e2/aaccddb8b06637625d847dbb5fe76ec3d15a74d89d983f5202f3666706e3/grpcio-1.49.1-cp311-cp311-linux_armv7l.whl", hash = "sha256:9fb17ff8c0d56099ac6ebfa84f670c5a62228d6b5c695cf21c02160c2ac1446b", size = 73399185, upload-time = "2022-09-22T02:57:56.219Z" }, { url = "https://files.pythonhosted.org/packages/90/0f/4d614d59f500835cbd27cb90743fb6b299098b0f22b8fd058d3586c933c0/grpcio-1.49.1-cp311-cp311-macosx_10_10_x86_64.whl", hash = "sha256:075f2d06e3db6b48a2157a1bcd52d6cbdca980dd18988fe6afdb41795d51625f", size = 4296299, upload-time = "2022-09-22T02:58:01.417Z" }, { url = "https://files.pythonhosted.org/packages/4d/ea/359a98f8b3b4ff9a2f457a0d20ed81775a64149fbb7617177ed23d9d10c9/grpcio-1.49.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc79b2b37d779ac42341ddef40ad5bf0966a64af412c89fc2b062e3ddabb093f", size = 4656437, upload-time = "2022-09-22T02:58:06.23Z" }, { url = "https://files.pythonhosted.org/packages/fc/89/4952d2dff95f5b95db5943b2d1b55c82a485830b992f52f212b33616b523/grpcio-1.49.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:49b301740cf5bc8fed4fee4c877570189ae3951432d79fa8e524b09353659811", size = 4888051, upload-time = "2022-09-22T02:58:11.411Z" }, @@ -1052,6 +1081,7 @@ dependencies = [ ] sdist = { url = "https://files.pythonhosted.org/packages/6c/e4/3416d25aebc4477141a491fae2c9494c5e0437a706c59103a936aac7d072/grpcio-tools-1.49.1.tar.gz", hash = "sha256:84cc64e5b46bad43d5d7bd2fd772b656eba0366961187a847e908e2cb735db91", size = 2252679, upload-time = "2022-09-22T03:03:00.279Z" } wheels = [ + { url = "https://files.pythonhosted.org/packages/e4/c1/ba298fe650b67c9e31a7ad88b2fe1d8d22ff2c6a9e131604054835397dfc/grpcio_tools-1.49.1-cp311-cp311-linux_armv7l.whl", hash = "sha256:9e5c13809ab2f245398e8446c4c3b399a62d591db651e46806cccf52a700452e", size = 36912892, upload-time = "2022-09-22T03:00:51.237Z" }, { url = "https://files.pythonhosted.org/packages/9c/8b/a45a39bf7d1c4956d48179831e4da88c3f6ee14dbdcb273e575bbeb7de20/grpcio_tools-1.49.1-cp311-cp311-macosx_10_10_x86_64.whl", hash = "sha256:ab3d0ee9623720ee585fdf3753b3755d3144a4a8ae35bca8e3655fa2f41056be", size = 2025040, upload-time = "2022-09-22T03:00:55.219Z" }, { url = "https://files.pythonhosted.org/packages/6d/7f/89dc6036b91f8cbada98b06801ac2f5db60885000feaf88f9d7cabe665b7/grpcio_tools-1.49.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:13e13b3643e7577a3ec13b79689eb4d7548890b1e104c04b9ed6557a3c3dd452", size = 2370982, upload-time = "2022-09-22T03:00:59.807Z" }, { url = "https://files.pythonhosted.org/packages/01/98/4730bfff6bcd3163db8c3d70689e19a1a5f419152316edfc1f13ff06a5d7/grpcio_tools-1.49.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:a64bab81b220c50033f584f57978ebbea575f09c1ccee765cd5c462177988098", size = 2731915, upload-time = "2022-09-22T03:01:05.44Z" }, @@ -1557,21 +1587,23 @@ wheels = [ [[package]] name = "osprey-rpc" version = "0.1.0" -source = { editable = "osprey_rpc" } +source = { virtual = "osprey_rpc" } [[package]] name = "osprey-worker" version = "0.1.0" source = { editable = "osprey_worker" } dependencies = [ + { name = "confluent-kafka" }, { name = "flask-cors" }, { name = "osprey-rpc" }, ] [package.metadata] requires-dist = [ + { name = "confluent-kafka", specifier = ">=2.14.0" }, { name = "flask-cors", specifier = ">=6.0.1" }, - { name = "osprey-rpc", editable = "osprey_rpc" }, + { name = "osprey-rpc", virtual = "osprey_rpc" }, ] [[package]] From e38532cb6f16f917fcf792bd3503ed50e8c94d55 Mon Sep 17 00:00:00 2001 From: hailey Date: Sun, 12 Apr 2026 16:24:46 -0700 Subject: [PATCH 02/10] uv lock --- uv.lock | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/uv.lock b/uv.lock index 212fb5d8..bdada683 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.11" resolution-markers = [ "python_full_version >= '4' and platform_machine == 'x86_64'", @@ -525,7 +525,7 @@ dependencies = [ [[package]] name = "example-plugins" version = "0.1.0" -source = { virtual = "example_plugins" } +source = { editable = "example_plugins" } dependencies = [ { name = "pluggy" }, ] @@ -952,7 +952,6 @@ dependencies = [ ] sdist = { url = "https://files.pythonhosted.org/packages/a3/1c/c42834d4fee45c5cf2d9546e97e879a1cbcdecfd16eb1a12144dcb91edae/grpcio-1.49.1.tar.gz", hash = "sha256:d4725fc9ec8e8822906ae26bb26f5546891aa7fbc3443de970cc556d43a5c99f", size = 22059239, upload-time = "2022-09-22T03:02:44.376Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/2d/e2/aaccddb8b06637625d847dbb5fe76ec3d15a74d89d983f5202f3666706e3/grpcio-1.49.1-cp311-cp311-linux_armv7l.whl", hash = "sha256:9fb17ff8c0d56099ac6ebfa84f670c5a62228d6b5c695cf21c02160c2ac1446b", size = 73399185, upload-time = "2022-09-22T02:57:56.219Z" }, { url = "https://files.pythonhosted.org/packages/90/0f/4d614d59f500835cbd27cb90743fb6b299098b0f22b8fd058d3586c933c0/grpcio-1.49.1-cp311-cp311-macosx_10_10_x86_64.whl", hash = "sha256:075f2d06e3db6b48a2157a1bcd52d6cbdca980dd18988fe6afdb41795d51625f", size = 4296299, upload-time = "2022-09-22T02:58:01.417Z" }, { url = "https://files.pythonhosted.org/packages/4d/ea/359a98f8b3b4ff9a2f457a0d20ed81775a64149fbb7617177ed23d9d10c9/grpcio-1.49.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc79b2b37d779ac42341ddef40ad5bf0966a64af412c89fc2b062e3ddabb093f", size = 4656437, upload-time = "2022-09-22T02:58:06.23Z" }, { url = "https://files.pythonhosted.org/packages/fc/89/4952d2dff95f5b95db5943b2d1b55c82a485830b992f52f212b33616b523/grpcio-1.49.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:49b301740cf5bc8fed4fee4c877570189ae3951432d79fa8e524b09353659811", size = 4888051, upload-time = "2022-09-22T02:58:11.411Z" }, @@ -1081,7 +1080,6 @@ dependencies = [ ] sdist = { url = "https://files.pythonhosted.org/packages/6c/e4/3416d25aebc4477141a491fae2c9494c5e0437a706c59103a936aac7d072/grpcio-tools-1.49.1.tar.gz", hash = "sha256:84cc64e5b46bad43d5d7bd2fd772b656eba0366961187a847e908e2cb735db91", size = 2252679, upload-time = "2022-09-22T03:03:00.279Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/e4/c1/ba298fe650b67c9e31a7ad88b2fe1d8d22ff2c6a9e131604054835397dfc/grpcio_tools-1.49.1-cp311-cp311-linux_armv7l.whl", hash = "sha256:9e5c13809ab2f245398e8446c4c3b399a62d591db651e46806cccf52a700452e", size = 36912892, upload-time = "2022-09-22T03:00:51.237Z" }, { url = "https://files.pythonhosted.org/packages/9c/8b/a45a39bf7d1c4956d48179831e4da88c3f6ee14dbdcb273e575bbeb7de20/grpcio_tools-1.49.1-cp311-cp311-macosx_10_10_x86_64.whl", hash = "sha256:ab3d0ee9623720ee585fdf3753b3755d3144a4a8ae35bca8e3655fa2f41056be", size = 2025040, upload-time = "2022-09-22T03:00:55.219Z" }, { url = "https://files.pythonhosted.org/packages/6d/7f/89dc6036b91f8cbada98b06801ac2f5db60885000feaf88f9d7cabe665b7/grpcio_tools-1.49.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:13e13b3643e7577a3ec13b79689eb4d7548890b1e104c04b9ed6557a3c3dd452", size = 2370982, upload-time = "2022-09-22T03:00:59.807Z" }, { url = "https://files.pythonhosted.org/packages/01/98/4730bfff6bcd3163db8c3d70689e19a1a5f419152316edfc1f13ff06a5d7/grpcio_tools-1.49.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:a64bab81b220c50033f584f57978ebbea575f09c1ccee765cd5c462177988098", size = 2731915, upload-time = "2022-09-22T03:01:05.44Z" }, @@ -1587,7 +1585,7 @@ wheels = [ [[package]] name = "osprey-rpc" version = "0.1.0" -source = { virtual = "osprey_rpc" } +source = { editable = "osprey_rpc" } [[package]] name = "osprey-worker" @@ -1603,7 +1601,7 @@ dependencies = [ requires-dist = [ { name = "confluent-kafka", specifier = ">=2.14.0" }, { name = "flask-cors", specifier = ">=6.0.1" }, - { name = "osprey-rpc", virtual = "osprey_rpc" }, + { name = "osprey-rpc", editable = "osprey_rpc" }, ] [[package]] From 434eb4bf89c395706319ebcc6e4348ab0984714c Mon Sep 17 00:00:00 2001 From: hailey Date: Sun, 12 Apr 2026 22:50:12 +0000 Subject: [PATCH 03/10] replace consumer --- .../worker/_stdlibplugin/sink_register.py | 6 +-- osprey_worker/src/osprey/worker/cli/sinks.py | 45 +++++++++++++++---- .../worker/sinks/input_stream_chooser.py | 21 +++++---- .../osprey/worker/sinks/sink/input_stream.py | 28 +++++++----- .../src/osprey/worker/sinks/utils/gevent.py | 17 ------- .../src/osprey/worker/sinks/utils/kafka.py | 13 ------ pyproject.toml | 4 +- uv.lock | 11 +---- 8 files changed, 71 insertions(+), 74 deletions(-) delete mode 100644 osprey_worker/src/osprey/worker/sinks/utils/gevent.py delete mode 100644 osprey_worker/src/osprey/worker/sinks/utils/kafka.py diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py index 20a790a7..f8d39efd 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py @@ -1,6 +1,5 @@ from typing import List, Sequence -from kafka import KafkaProducer from osprey.worker._stdlibplugin.execution_result_store_chooser import get_rules_execution_result_storage_backend from osprey.worker.adaptor.plugin_manager import hookimpl_osprey from osprey.worker.lib.config import Config @@ -21,8 +20,9 @@ def register_output_sinks(config: Config) -> Sequence[BaseOutputSink]: client_id = config.expect_str('OSPREY_KAFKA_OUTPUT_CLIENT_ID') sinks.append( KafkaOutputSink( - kafka_topic=output_topic, - kafka_producer=KafkaProducer(bootstrap_servers=bootstrap_servers, client_id=client_id), + bootstrap_servers=bootstrap_servers, + output_topic=output_topic, + client_id=client_id, ) ) diff --git a/osprey_worker/src/osprey/worker/cli/sinks.py b/osprey_worker/src/osprey/worker/cli/sinks.py index 13f1af95..3786c893 100644 --- a/osprey_worker/src/osprey/worker/cli/sinks.py +++ b/osprey_worker/src/osprey/worker/cli/sinks.py @@ -23,7 +23,7 @@ import click import gevent -import kafka +from confluent_kafka import Consumer import sentry_sdk from google.api_core.exceptions import AlreadyExists from google.cloud import pubsub_v1 @@ -51,7 +51,6 @@ from osprey.worker.sinks.sink.input_stream import PostgresInputStream from osprey.worker.sinks.sink.osprey_coordinator_input_stream import OspreyCoordinatorInputStream from osprey.worker.sinks.sink.rules_sink import RulesSink -from osprey.worker.sinks.utils.kafka import PatchedKafkaConsumer LOGGER = get_logger() @@ -81,9 +80,23 @@ def tail_kafka_output_sink() -> None: output_topic = config.get_str('OSPREY_KAFKA_OUTPUT_SINK_TOPIC', 'osprey.execution_results') bootstrap_servers = config.get_str_list('OSPREY_KAFKA_BOOTSTRAP_SERVERS', ['localhost']) - kakfa_consumer = kafka.KafkaConsumer(output_topic, bootstrap_servers=bootstrap_servers) - for event in kakfa_consumer: - print(event) + consumer = Consumer({ + 'bootstrap.servers': ','.join(bootstrap_servers), + 'group.id': 'osprey-tail-output', + 'auto.offset.reset': 'latest', + }) + consumer.subscribe([output_topic]) + try: + while True: + msg = consumer.poll(timeout=1.0) + if msg is None: + continue + if msg.error(): + print(f'Error: {msg.error()}') + continue + print(msg.value()) + finally: + consumer.close() @cli.command() @@ -101,9 +114,25 @@ def tail_kafka_input_sink() -> None: client_id = config.get_str('OSPREY_KAFKA_INPUT_STREAM_CLIENT_ID', 'localhost') bootstrap_servers = config.get_str_list('OSPREY_KAFKA_BOOTSTRAP_SERVERS', ['localhost']) input_topic = config.get_str('OSPREY_KAFKA_INPUT_STREAM_TOPIC', 'osprey.actions_input') - kafka_consumer = PatchedKafkaConsumer(input_topic, bootstrap_servers=bootstrap_servers, client_id=client_id) - for event in kafka_consumer: - print(event) + + consumer = Consumer({ + 'bootstrap.servers': ','.join(bootstrap_servers), + 'client.id': client_id, + 'group.id': 'osprey-tail-input', + 'auto.offset.reset': 'latest', + }) + consumer.subscribe([input_topic]) + try: + while True: + msg = consumer.poll(timeout=1.0) + if msg is None: + continue + if msg.error(): + print(f'Error: {msg.error()}') + continue + print(msg.value()) + finally: + consumer.close() @cli.command() diff --git a/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py b/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py index b33c6d1e..663d97d3 100644 --- a/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py +++ b/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py @@ -3,7 +3,7 @@ from datetime import datetime, timedelta from google.cloud import pubsub_v1 -from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from confluent_kafka import Consumer from osprey.engine.executor.execution_context import Action from osprey.worker.adaptor.plugin_manager import bootstrap_input_stream from osprey.worker.lib.singletons import CONFIG @@ -15,7 +15,6 @@ ) from osprey.worker.sinks.sink.osprey_coordinator_input_stream import OspreyCoordinatorInputStream from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext, NoopAckingContext -from osprey.worker.sinks.utils.kafka import PatchedKafkaConsumer def get_rules_sink_input_stream( @@ -113,13 +112,17 @@ def get_rules_sink_input_stream( if client_id_suffix: client_id = f'{client_id}-{client_id_suffix}' - consumer: PatchedKafkaConsumer = PatchedKafkaConsumer( - input_topic, - bootstrap_servers=input_bootstrap_servers, - client_id=client_id, - group_id=group_id, - partition_assignment_strategy=(RoundRobinPartitionAssignor,), - ) + consumer_config = { + 'bootstrap.servers': ','.join(input_bootstrap_servers), + 'client.id': client_id, + 'group.id': group_id or 'osprey-consumer', + 'partition.assignment.strategy': 'roundrobin', + 'auto.offset.reset': 'latest', + } + + consumer = Consumer(consumer_config) + consumer.subscribe([input_topic]) + from osprey.worker.sinks.sink.input_stream import KafkaInputStream return KafkaInputStream( diff --git a/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py b/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py index 55255c1c..5809e331 100644 --- a/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py +++ b/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py @@ -8,6 +8,8 @@ import gevent import msgpack import sentry_sdk +from confluent_kafka import Consumer +from confluent_kafka import Message as KafkaMessage from gevent.lock import RLock from gevent.queue import Queue as GeventQueue from google.api_core import retry @@ -16,7 +18,6 @@ from google.protobuf.message import DecodeError from google.protobuf.message import Message as ProtoMessage from google.pubsub_v1 import PubsubMessage -from kafka.consumer.fetcher import ConsumerRecord from osprey.engine.executor.execution_context import Action from osprey.worker.lib.encryption.envelope import Envelope from osprey.worker.lib.instruments import metrics @@ -29,7 +30,6 @@ PubSubMessageAckingContext, PullPubSubMessageContext, ) -from osprey.worker.sinks.utils.kafka import PatchedKafkaConsumer from pydantic import BaseModel from tenacity import RetryCallState, retry_if_exception_type, stop_never, wait_exponential from tenacity import retry as tenacity_retry @@ -413,22 +413,27 @@ def claim_with_retry() -> Optional[_ModelT]: class KafkaInputStream(BaseInputStream[BaseAckingContext[Action]]): """An input stream that consumes messages from a Kafka topic and yields Action objects wrapped in an AckingContext.""" - def __init__(self, kafka_consumer: PatchedKafkaConsumer): + def __init__(self, kafka_consumer: Consumer): super().__init__() - self._consumer: PatchedKafkaConsumer = kafka_consumer + self._consumer: Consumer = kafka_consumer def _gen(self) -> Iterator[BaseAckingContext[Action]]: while True: try: - with metrics.timed('kafka_consumer.lock_time'): - with metrics.timed('kafka_consumer.poll_time'): - record: ConsumerRecord = next(self._consumer) - data = json.loads(record.value) + with metrics.timed('kafka_consumer.poll_time'): + msg: Optional[KafkaMessage] = self._consumer.poll(timeout=1.0) + + if msg is None: + continue + + if msg.error(): + logger.error(f'Kafka consumer error: {msg.error()}') + sentry_sdk.capture_exception(Exception(str(msg.error()))) + continue + + data = json.loads(msg.value()) timestamp = parse_go_timestamp(data['send_time']) action_data = data['data'] - # this was here for when this was protobuf. If its json by default, we should just assume its all in one - # json blob. - # action_data = json.loads(action_data_json) action = Action( action_id=int(action_data['action_id']), @@ -436,7 +441,6 @@ def _gen(self) -> Iterator[BaseAckingContext[Action]]: data=action_data['data'], timestamp=timestamp, ) - # Wrap in NoopAckingContext for now, or implement a KafkaAckingContext if needed yield NoopAckingContext(action) except Exception as e: logger.exception('Error while consuming from Kafka') diff --git a/osprey_worker/src/osprey/worker/sinks/utils/gevent.py b/osprey_worker/src/osprey/worker/sinks/utils/gevent.py deleted file mode 100644 index 08a38259..00000000 --- a/osprey_worker/src/osprey/worker/sinks/utils/gevent.py +++ /dev/null @@ -1,17 +0,0 @@ -from typing import Optional - -from gevent import sleep -from gevent.lock import RLock - - -class FairRLock(RLock): # type: ignore - """A RLock that is fair, and will allow lock acquisitions that were attempted first to be acquired, - which is distinct from the way that RLock works in gevent, which will allow the greenlet that just released - the lock to re-acquire it, even if there is another greenlet waiting to acquire the lock, thus starving all - other lockers.""" - - def acquire(self, blocking: bool = True, timeout: Optional[float] = None) -> bool: - if blocking and not self._block.locked() and self._block.linkcount(): - sleep(0) - - return super().acquire(blocking=blocking, timeout=timeout) diff --git a/osprey_worker/src/osprey/worker/sinks/utils/kafka.py b/osprey_worker/src/osprey/worker/sinks/utils/kafka.py deleted file mode 100644 index 92e5a2d7..00000000 --- a/osprey_worker/src/osprey/worker/sinks/utils/kafka.py +++ /dev/null @@ -1,13 +0,0 @@ -from typing import Any - -from kafka import KafkaConsumer - -from .gevent import FairRLock - - -class PatchedKafkaConsumer(KafkaConsumer): # type: ignore - """A KafkaConsnsumer that has had its lock patched to FairRLock, that prevents deadlocking.""" - - def __init__(self, *args: Any, **kwargs: Any) -> None: - super().__init__(*args, **kwargs) - self._client._lock = FairRLock() diff --git a/pyproject.toml b/pyproject.toml index c71c627f..fec67fa5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,7 +45,7 @@ common = [ "hash-ring", "intervals==0.9.2", "jsonpath-rw", - "kafka-python==1.4.7", + "confluent-kafka>=2.14.0", "minio>=7.2.16", "mmh3==3.0.0", "msgpack==1.0.8", @@ -299,7 +299,7 @@ module = [ "deepmerge.*", "greenlet.*", "Levenshtein.*", - "kafka.*", + "confluent_kafka.*", "pythonjsonlogger.*", "msgpack.*", "jose.*", diff --git a/uv.lock b/uv.lock index bdada683..2fe8511b 100644 --- a/uv.lock +++ b/uv.lock @@ -31,6 +31,7 @@ overrides = [{ name = "ddtrace", specifier = ">=2.17.2,<3.0.0" }] common = [ { name = "blinker", specifier = "==1.9.0" }, { name = "click", specifier = "==7.1.2" }, + { name = "confluent-kafka", specifier = ">=2.14.0" }, { name = "datadog", specifier = "==0.51.0" }, { name = "ddtrace", specifier = "==2.21.11" }, { name = "deepmerge", specifier = "==0.3.0" }, @@ -70,7 +71,6 @@ common = [ { name = "hash-ring", url = "https://storage.googleapis.com/discord-devops/hash_ring-src-b4b56bc93053881b68b829ee9d1a4871b4aee592.zip" }, { name = "intervals", specifier = "==0.9.2" }, { name = "jsonpath-rw", url = "https://github.com/kennknowles/python-jsonpath-rw/archive/6f5647bb3ad2395c20f0191fef07a1df51c9fed8.tar.gz" }, - { name = "kafka-python", specifier = "==1.4.7" }, { name = "minio", specifier = ">=7.2.16" }, { name = "mmh3", specifier = "==3.0.0" }, { name = "msgpack", specifier = "==1.0.8" }, @@ -1316,15 +1316,6 @@ requires-dist = [ { name = "six" }, ] -[[package]] -name = "kafka-python" -version = "1.4.7" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/58/20/2a2494fe7b4bf4cb36c060f88a55adcd944b7876877691a2f28cd544467b/kafka-python-1.4.7.tar.gz", hash = "sha256:2f29baad4b3efe05a2bb81ac268855aa01cbc68397f15bac77b494ffd7e2cada", size = 290977, upload-time = "2019-09-30T21:13:20.078Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/49/c9/9863483a1353700ba87821b4f39085eb18fd1bcbb1e954c697177d67f03f/kafka_python-1.4.7-py2.py3-none-any.whl", hash = "sha256:4fbebebfcb6fc94903fb720fe883d7bbec7298f4f1acb857c21dd3b4b114ba4b", size = 266121, upload-time = "2019-09-30T21:13:17.697Z" }, -] - [[package]] name = "legacy-cgi" version = "2.6.4" From bddcdee0add98b2dccad5a018b472d9cb90c445c Mon Sep 17 00:00:00 2001 From: hailey Date: Mon, 13 Apr 2026 00:02:59 +0000 Subject: [PATCH 04/10] fix config value --- osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py b/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py index a2a9b370..29bce629 100644 --- a/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py +++ b/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py @@ -68,7 +68,7 @@ def __init__( 'enable.idempotence': True, 'acks': 'all', 'max.in.flight.requests.per.connection': 5, - 'max.message.bytes': 20_000_000, + 'message.max.bytes': 20_000_000, } self.producer = Producer(config) From 647f5bd2b0b3b2fe4f559f8b9c903bce73115df1 Mon Sep 17 00:00:00 2001 From: hailey Date: Mon, 13 Apr 2026 00:21:03 +0000 Subject: [PATCH 05/10] attempt to fix gevent woes --- .../worker/sinks/input_stream_chooser.py | 4 +- .../osprey/worker/sinks/sink/input_stream.py | 6 +- .../worker/sinks/sink/kafka_output_sink.py | 38 ++----- .../src/osprey/worker/sinks/utils/kafka.py | 101 ++++++++++++++++++ 4 files changed, 117 insertions(+), 32 deletions(-) create mode 100644 osprey_worker/src/osprey/worker/sinks/utils/kafka.py diff --git a/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py b/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py index 663d97d3..6ff4053e 100644 --- a/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py +++ b/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py @@ -4,6 +4,7 @@ from google.cloud import pubsub_v1 from confluent_kafka import Consumer +from osprey.worker.sinks.utils.kafka import ThreadedKafkaConsumer from osprey.engine.executor.execution_context import Action from osprey.worker.adaptor.plugin_manager import bootstrap_input_stream from osprey.worker.lib.singletons import CONFIG @@ -122,11 +123,12 @@ def get_rules_sink_input_stream( consumer = Consumer(consumer_config) consumer.subscribe([input_topic]) + threaded_consumer = ThreadedKafkaConsumer(consumer) from osprey.worker.sinks.sink.input_stream import KafkaInputStream return KafkaInputStream( - kafka_consumer=consumer, + kafka_consumer=threaded_consumer, ) elif input_stream_source == InputStreamSource.PLUGIN: stream = bootstrap_input_stream(config=config) diff --git a/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py b/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py index 5809e331..ed2ca1ca 100644 --- a/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py +++ b/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py @@ -8,7 +8,6 @@ import gevent import msgpack import sentry_sdk -from confluent_kafka import Consumer from confluent_kafka import Message as KafkaMessage from gevent.lock import RLock from gevent.queue import Queue as GeventQueue @@ -43,6 +42,7 @@ if TYPE_CHECKING: from google.cloud.pubsub_v1.types import PullResponse + from osprey.worker.sinks.utils.kafka import ThreadedKafkaConsumer class BaseInputStream(abc.ABC, Generic[_T]): @@ -413,9 +413,9 @@ def claim_with_retry() -> Optional[_ModelT]: class KafkaInputStream(BaseInputStream[BaseAckingContext[Action]]): """An input stream that consumes messages from a Kafka topic and yields Action objects wrapped in an AckingContext.""" - def __init__(self, kafka_consumer: Consumer): + def __init__(self, kafka_consumer: 'ThreadedKafkaConsumer'): super().__init__() - self._consumer: Consumer = kafka_consumer + self._consumer = kafka_consumer def _gen(self) -> Iterator[BaseAckingContext[Action]]: while True: diff --git a/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py b/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py index 29bce629..134b6f61 100644 --- a/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py +++ b/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py @@ -7,6 +7,7 @@ from osprey.engine.executor.execution_context import ExecutionResult from osprey.worker.lib.osprey_shared.logging import get_logger from osprey.worker.sinks.sink.output_sink import BaseOutputSink +from osprey.worker.sinks.utils.kafka import ThreadedKafkaProducer logger = get_logger() @@ -27,7 +28,6 @@ def __init__( bootstrap_servers: list[str], output_topic: str, client_id: str | None, - poll_every: int = 20, ) -> None: if len(bootstrap_servers) == 0: raise EmptyBootstrapServersException() @@ -39,8 +39,6 @@ def __init__( self._bootstrap_servers = bootstrap_servers self._output_topic = output_topic - self._poll_every = poll_every - self._message_count = 0 # NOTE(haileyok): this is...not necessary probably self.topic_ensured = False @@ -71,7 +69,8 @@ def __init__( 'message.max.bytes': 20_000_000, } - self.producer = Producer(config) + self._producer = Producer(config) + self._threaded_producer = ThreadedKafkaProducer(self._producer) self.ensure_topic() super().__init__() @@ -100,39 +99,22 @@ def ensure_topic(self) -> None: except Exception as e: self.logger.error(f'Error creating topic, unable to ensure topic: {e}') - def _handle_polling(self) -> None: - """Poll periodically based on message count""" - self._message_count += 1 - if self._message_count % self._poll_every == 0: - self.producer.poll(0) - def will_do_work(self, result: ExecutionResult) -> bool: return True def push(self, result: ExecutionResult) -> None: - try: - self.producer.produce( - self._output_topic, - value=result.extracted_features_json.encode('utf-8'), - on_delivery=self._on_delivery, - ) - except BufferError: - self.logger.warning('Producer queue full, flushing before retry') - self.producer.flush(timeout=5) - self.producer.produce( - self._output_topic, - value=result.extracted_features_json.encode('utf-8'), - on_delivery=self._on_delivery, - ) - - self._handle_polling() + self._threaded_producer.produce( + self._output_topic, + value=result.extracted_features_json.encode('utf-8'), + on_delivery=self._on_delivery, + ) def _on_delivery(self, err: Any, msg: Any) -> None: if err is not None: self.push_err_to_sentry(err) def flush(self, timeout: float = 30) -> int: - return self.producer.flush(timeout) + return self._threaded_producer.flush(timeout) @classmethod def push_err_to_sentry(cls, e: Exception) -> None: @@ -140,6 +122,6 @@ def push_err_to_sentry(cls, e: Exception) -> None: sentry_sdk.capture_exception(error=e) def stop(self) -> None: - remaining = self.flush(timeout=10) + remaining = self._threaded_producer.close(timeout=10) if remaining > 0: logger.warning(f'{remaining} messages were not delivered') diff --git a/osprey_worker/src/osprey/worker/sinks/utils/kafka.py b/osprey_worker/src/osprey/worker/sinks/utils/kafka.py new file mode 100644 index 00000000..cfae8721 --- /dev/null +++ b/osprey_worker/src/osprey/worker/sinks/utils/kafka.py @@ -0,0 +1,101 @@ +"""Threaded wrappers for confluent-kafka that keep librdkafka's C threads off the gevent event loop. + +confluent-kafka uses librdkafka (C) under the hood. Its internal threads and network I/O bypass +gevent's monkey-patching entirely, which means they can hold the GIL and starve greenlets. These +wrappers run all confluent-kafka operations on dedicated OS threads and bridge back to greenlet-land +via gevent queues. +""" + +import threading +from typing import Any, Optional + +import gevent.queue +import sentry_sdk +from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer +from osprey.worker.lib.osprey_shared.logging import get_logger + +logger = get_logger() + +_SENTINEL = object() + + +class ThreadedKafkaConsumer: + """Wraps a confluent-kafka Consumer, polling on a real OS thread and delivering messages via a gevent queue.""" + + def __init__(self, consumer: Consumer, queue_maxsize: int = 1000) -> None: + self._consumer = consumer + self._queue: gevent.queue.Queue[Optional[Message]] = gevent.queue.Queue(maxsize=queue_maxsize) + self._stop_event = threading.Event() + self._thread = threading.Thread(target=self._poll_loop, daemon=True, name='kafka-consumer-thread') + self._thread.start() + + def _poll_loop(self) -> None: + while not self._stop_event.is_set(): + try: + msg = self._consumer.poll(timeout=1.0) + if msg is not None: + self._queue.put(msg) + except KafkaException as e: + logger.error(f'Kafka consumer poll error: {e}') + sentry_sdk.capture_exception(e) + + def poll(self, timeout: Optional[float] = None) -> Optional[Message]: + """Get the next message from the gevent queue. Safe to call from a greenlet.""" + try: + return self._queue.get(timeout=timeout) + except gevent.queue.Empty: + return None + + def close(self) -> None: + self._stop_event.set() + self._thread.join(timeout=5) + self._consumer.close() + + +class ThreadedKafkaProducer: + """Wraps a confluent-kafka Producer, running produce/poll on a real OS thread.""" + + def __init__(self, producer: Producer, poll_interval: float = 0.1) -> None: + self._producer = producer + self._queue: gevent.queue.Queue = gevent.queue.Queue() + self._poll_interval = poll_interval + self._stop_event = threading.Event() + self._thread = threading.Thread(target=self._produce_loop, daemon=True, name='kafka-producer-thread') + self._thread.start() + + def _produce_loop(self) -> None: + while not self._stop_event.is_set(): + self._producer.poll(0) + + try: + item = self._queue.get_nowait() + except gevent.queue.Empty: + # No pending produce requests — just poll and sleep briefly + self._stop_event.wait(self._poll_interval) + continue + + if item is _SENTINEL: + break + + topic, value, on_delivery = item + try: + self._producer.produce(topic, value=value, on_delivery=on_delivery) + except BufferError: + logger.warning('Producer queue full, flushing before retry') + self._producer.flush(timeout=5) + self._producer.produce(topic, value=value, on_delivery=on_delivery) + + def produce(self, topic: str, value: bytes, on_delivery: Any = None) -> None: + """Queue a message for production. Safe to call from a greenlet.""" + self._queue.put((topic, value, on_delivery)) + + def flush(self, timeout: float = 30) -> int: + """Flush pending messages. Blocks until complete.""" + return self._producer.flush(timeout) + + def close(self, timeout: float = 10) -> int: + """Stop the producer thread and flush remaining messages.""" + self._stop_event.set() + self._queue.put(_SENTINEL) + self._thread.join(timeout=5) + return self._producer.flush(timeout) From 9ba3cb7dc465cbe80c83e13bfb51adc3ce25de8a Mon Sep 17 00:00:00 2001 From: hailey Date: Mon, 13 Apr 2026 00:33:33 +0000 Subject: [PATCH 06/10] more adjustements --- .../src/osprey/worker/sinks/utils/kafka.py | 37 +++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/osprey_worker/src/osprey/worker/sinks/utils/kafka.py b/osprey_worker/src/osprey/worker/sinks/utils/kafka.py index cfae8721..394c8c7d 100644 --- a/osprey_worker/src/osprey/worker/sinks/utils/kafka.py +++ b/osprey_worker/src/osprey/worker/sinks/utils/kafka.py @@ -1,23 +1,27 @@ """Threaded wrappers for confluent-kafka that keep librdkafka's C threads off the gevent event loop. -confluent-kafka uses librdkafka (C) under the hood. Its internal threads and network I/O bypass +confluent-kafka uses librdkafka under the hood. Its internal threads and network I/O bypass gevent's monkey-patching entirely, which means they can hold the GIL and starve greenlets. These -wrappers run all confluent-kafka operations on dedicated OS threads and bridge back to greenlet-land -via gevent queues. +wrappers run all confluent-kafka operations on dedicated OS threads instead of greenlets, then +bridge back to greenlet-land via gevent async watchers. """ -import threading from typing import Any, Optional +import gevent._threading as _real_threading import gevent.queue import sentry_sdk -from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer +from confluent_kafka import Consumer, KafkaException, Message, Producer from osprey.worker.lib.osprey_shared.logging import get_logger logger = get_logger() _SENTINEL = object() +import gevent.monkey + +_real_sleep = gevent.monkey.get_original('time', 'sleep') + class ThreadedKafkaConsumer: """Wraps a confluent-kafka Consumer, polling on a real OS thread and delivering messages via a gevent queue.""" @@ -25,12 +29,11 @@ class ThreadedKafkaConsumer: def __init__(self, consumer: Consumer, queue_maxsize: int = 1000) -> None: self._consumer = consumer self._queue: gevent.queue.Queue[Optional[Message]] = gevent.queue.Queue(maxsize=queue_maxsize) - self._stop_event = threading.Event() - self._thread = threading.Thread(target=self._poll_loop, daemon=True, name='kafka-consumer-thread') - self._thread.start() + self._running = True + self._thread_ident = _real_threading.start_new_thread(self._poll_loop, ()) def _poll_loop(self) -> None: - while not self._stop_event.is_set(): + while self._running: try: msg = self._consumer.poll(timeout=1.0) if msg is not None: @@ -47,8 +50,7 @@ def poll(self, timeout: Optional[float] = None) -> Optional[Message]: return None def close(self) -> None: - self._stop_event.set() - self._thread.join(timeout=5) + self._running = False self._consumer.close() @@ -59,19 +61,17 @@ def __init__(self, producer: Producer, poll_interval: float = 0.1) -> None: self._producer = producer self._queue: gevent.queue.Queue = gevent.queue.Queue() self._poll_interval = poll_interval - self._stop_event = threading.Event() - self._thread = threading.Thread(target=self._produce_loop, daemon=True, name='kafka-producer-thread') - self._thread.start() + self._running = True + self._thread_ident = _real_threading.start_new_thread(self._produce_loop, ()) def _produce_loop(self) -> None: - while not self._stop_event.is_set(): + while self._running: self._producer.poll(0) try: item = self._queue.get_nowait() except gevent.queue.Empty: - # No pending produce requests — just poll and sleep briefly - self._stop_event.wait(self._poll_interval) + _real_sleep(self._poll_interval) continue if item is _SENTINEL: @@ -95,7 +95,6 @@ def flush(self, timeout: float = 30) -> int: def close(self, timeout: float = 10) -> int: """Stop the producer thread and flush remaining messages.""" - self._stop_event.set() + self._running = False self._queue.put(_SENTINEL) - self._thread.join(timeout=5) return self._producer.flush(timeout) From 14b3960d0bba9af77525e1ee6f6212f1a03ed840 Mon Sep 17 00:00:00 2001 From: VINODvoid Date: Thu, 14 May 2026 15:09:31 +0530 Subject: [PATCH 07/10] make topic creation and partition config configurable via env vars --- .../src/osprey/worker/_stdlibplugin/sink_register.py | 6 ++++++ .../osprey/worker/sinks/sink/kafka_output_sink.py | 12 +++++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py index f8d39efd..cb95fa1c 100644 --- a/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py +++ b/osprey_worker/src/osprey/worker/_stdlibplugin/sink_register.py @@ -18,11 +18,17 @@ def register_output_sinks(config: Config) -> Sequence[BaseOutputSink]: output_topic = config.expect_str('OSPREY_KAFKA_OUTPUT_TOPIC') bootstrap_servers = config.expect_str_list('OSPREY_KAFKA_BOOTSTRAP_SERVERS') client_id = config.expect_str('OSPREY_KAFKA_OUTPUT_CLIENT_ID') + auto_create_topic = config.get_bool('OSPREY_KAFKA_AUTO_CREATE_TOPIC', True) + num_partitions = config.get_int('OSPREY_KAFKA_NUM_PARTITIONS', 1) + replication_factor = config.get_int('OSPREY_KAFKA_REPLICATION_FACTOR', 1) sinks.append( KafkaOutputSink( bootstrap_servers=bootstrap_servers, output_topic=output_topic, client_id=client_id, + auto_create_topic=auto_create_topic, + num_partitions=num_partitions, + replication_factor=replication_factor, ) ) diff --git a/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py b/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py index 134b6f61..446ffcce 100644 --- a/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py +++ b/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py @@ -28,6 +28,9 @@ def __init__( bootstrap_servers: list[str], output_topic: str, client_id: str | None, + auto_create_topic: bool = True, + num_partitions: int = 1, + replication_factor: int = 1, ) -> None: if len(bootstrap_servers) == 0: raise EmptyBootstrapServersException() @@ -39,6 +42,8 @@ def __init__( self._bootstrap_servers = bootstrap_servers self._output_topic = output_topic + self._num_partitions = num_partitions + self._replication_factor = replication_factor # NOTE(haileyok): this is...not necessary probably self.topic_ensured = False @@ -71,7 +76,9 @@ def __init__( self._producer = Producer(config) self._threaded_producer = ThreadedKafkaProducer(self._producer) - self.ensure_topic() + + if auto_create_topic: + self.ensure_topic() super().__init__() @@ -91,8 +98,7 @@ def ensure_topic(self) -> None: self.logger.info(f'Creating topic {self._output_topic}') try: - # TODO: num_partitions and replication_factor should not be hard coded... - topic = NewTopic(self._output_topic, num_partitions=3, replication_factor=3) + topic = NewTopic(self._output_topic, num_partitions=self._num_partitions, replication_factor=self._replication_factor) fs = admin_client.create_topics([topic]) fs[self._output_topic].result() self.topic_ensured = True From 820ea0e709e17525f1c7d6e71d17d96f08ef3cc6 Mon Sep 17 00:00:00 2001 From: VINODvoid Date: Thu, 14 May 2026 15:17:50 +0530 Subject: [PATCH 08/10] fix ruff import ordering issues --- osprey_worker/src/osprey/worker/cli/sinks.py | 2 +- osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py | 4 ++-- osprey_worker/src/osprey/worker/sinks/utils/kafka.py | 4 +--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/osprey_worker/src/osprey/worker/cli/sinks.py b/osprey_worker/src/osprey/worker/cli/sinks.py index 3786c893..33a9030c 100644 --- a/osprey_worker/src/osprey/worker/cli/sinks.py +++ b/osprey_worker/src/osprey/worker/cli/sinks.py @@ -23,8 +23,8 @@ import click import gevent -from confluent_kafka import Consumer import sentry_sdk +from confluent_kafka import Consumer from google.api_core.exceptions import AlreadyExists from google.cloud import pubsub_v1 from osprey.worker.lib import instruments diff --git a/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py b/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py index 6ff4053e..59af6604 100644 --- a/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py +++ b/osprey_worker/src/osprey/worker/sinks/input_stream_chooser.py @@ -2,9 +2,8 @@ import random from datetime import datetime, timedelta -from google.cloud import pubsub_v1 from confluent_kafka import Consumer -from osprey.worker.sinks.utils.kafka import ThreadedKafkaConsumer +from google.cloud import pubsub_v1 from osprey.engine.executor.execution_context import Action from osprey.worker.adaptor.plugin_manager import bootstrap_input_stream from osprey.worker.lib.singletons import CONFIG @@ -16,6 +15,7 @@ ) from osprey.worker.sinks.sink.osprey_coordinator_input_stream import OspreyCoordinatorInputStream from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext, NoopAckingContext +from osprey.worker.sinks.utils.kafka import ThreadedKafkaConsumer def get_rules_sink_input_stream( diff --git a/osprey_worker/src/osprey/worker/sinks/utils/kafka.py b/osprey_worker/src/osprey/worker/sinks/utils/kafka.py index 394c8c7d..b8f756b3 100644 --- a/osprey_worker/src/osprey/worker/sinks/utils/kafka.py +++ b/osprey_worker/src/osprey/worker/sinks/utils/kafka.py @@ -9,6 +9,7 @@ from typing import Any, Optional import gevent._threading as _real_threading +import gevent.monkey import gevent.queue import sentry_sdk from confluent_kafka import Consumer, KafkaException, Message, Producer @@ -17,9 +18,6 @@ logger = get_logger() _SENTINEL = object() - -import gevent.monkey - _real_sleep = gevent.monkey.get_original('time', 'sleep') From 38abf3a735a35fdbf813ded4886f6938be386025 Mon Sep 17 00:00:00 2001 From: VINODvoid Date: Thu, 14 May 2026 15:19:53 +0530 Subject: [PATCH 09/10] fix ruff format issues --- osprey_worker/src/osprey/worker/cli/sinks.py | 26 +++++++++++-------- .../worker/sinks/sink/kafka_output_sink.py | 4 ++- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/osprey_worker/src/osprey/worker/cli/sinks.py b/osprey_worker/src/osprey/worker/cli/sinks.py index 33a9030c..6588eaf9 100644 --- a/osprey_worker/src/osprey/worker/cli/sinks.py +++ b/osprey_worker/src/osprey/worker/cli/sinks.py @@ -80,11 +80,13 @@ def tail_kafka_output_sink() -> None: output_topic = config.get_str('OSPREY_KAFKA_OUTPUT_SINK_TOPIC', 'osprey.execution_results') bootstrap_servers = config.get_str_list('OSPREY_KAFKA_BOOTSTRAP_SERVERS', ['localhost']) - consumer = Consumer({ - 'bootstrap.servers': ','.join(bootstrap_servers), - 'group.id': 'osprey-tail-output', - 'auto.offset.reset': 'latest', - }) + consumer = Consumer( + { + 'bootstrap.servers': ','.join(bootstrap_servers), + 'group.id': 'osprey-tail-output', + 'auto.offset.reset': 'latest', + } + ) consumer.subscribe([output_topic]) try: while True: @@ -115,12 +117,14 @@ def tail_kafka_input_sink() -> None: bootstrap_servers = config.get_str_list('OSPREY_KAFKA_BOOTSTRAP_SERVERS', ['localhost']) input_topic = config.get_str('OSPREY_KAFKA_INPUT_STREAM_TOPIC', 'osprey.actions_input') - consumer = Consumer({ - 'bootstrap.servers': ','.join(bootstrap_servers), - 'client.id': client_id, - 'group.id': 'osprey-tail-input', - 'auto.offset.reset': 'latest', - }) + consumer = Consumer( + { + 'bootstrap.servers': ','.join(bootstrap_servers), + 'client.id': client_id, + 'group.id': 'osprey-tail-input', + 'auto.offset.reset': 'latest', + } + ) consumer.subscribe([input_topic]) try: while True: diff --git a/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py b/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py index 446ffcce..c091e19b 100644 --- a/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py +++ b/osprey_worker/src/osprey/worker/sinks/sink/kafka_output_sink.py @@ -98,7 +98,9 @@ def ensure_topic(self) -> None: self.logger.info(f'Creating topic {self._output_topic}') try: - topic = NewTopic(self._output_topic, num_partitions=self._num_partitions, replication_factor=self._replication_factor) + topic = NewTopic( + self._output_topic, num_partitions=self._num_partitions, replication_factor=self._replication_factor + ) fs = admin_client.create_topics([topic]) fs[self._output_topic].result() self.topic_ensured = True From f8e3cfdf491e543acf71b1d2c90ed8e18c2008f2 Mon Sep 17 00:00:00 2001 From: VINODvoid Date: Thu, 14 May 2026 15:26:41 +0530 Subject: [PATCH 10/10] fix mypy error for nullable msg.value() from confluent-kafka --- osprey_worker/src/osprey/worker/sinks/sink/input_stream.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py b/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py index ed2ca1ca..f3eb664d 100644 --- a/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py +++ b/osprey_worker/src/osprey/worker/sinks/sink/input_stream.py @@ -431,7 +431,11 @@ def _gen(self) -> Iterator[BaseAckingContext[Action]]: sentry_sdk.capture_exception(Exception(str(msg.error()))) continue - data = json.loads(msg.value()) + value = msg.value() + if value is None: + continue + + data = json.loads(value) timestamp = parse_go_timestamp(data['send_time']) action_data = data['data']