Skip to content

refactor osprey to use confluent-kafka instead of kafka-python#220

Draft
haileyok wants to merge 7 commits into
mainfrom
hailey/confluent-kafka
Draft

refactor osprey to use confluent-kafka instead of kafka-python#220
haileyok wants to merge 7 commits into
mainfrom
hailey/confluent-kafka

Conversation

@haileyok
Copy link
Copy Markdown
Member

@haileyok haileyok commented Apr 12, 2026

Description

Will fill out more later, but here I'm removing kafka-python and replacing it with confluent-kafka. Fixes #229

Checklist

  • Tests pass locally
  • uv run ruff check . passes (no unused imports or other lint errors)
  • uv tool run fawltydeps --check-unused --pyenv .venv passes (no unused dependencies)
  • Updated CHANGELOG.md with my changes, if applicable

Comment thread osprey_worker/src/osprey/worker/sinks/utils/kafka.py Fixed

self._producer = Producer(config)
self._threaded_producer = ThreadedKafkaProducer(self._producer)
self.ensure_topic()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth making this optional, as in some environments you don't want dynamic topic creation, but instead to manage topics with say, terraform (this was a thing we did with IFTAS CCS, such that anything exploiting our code couldn't gain admin access to kafka)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya i agree, i might just rip this code out entirely but maybe ill make it some environment variable that you can toggle on...idk yet

Comment on lines +95 to +96
# TODO: num_partitions and replication_factor should not be hard coded...
topic = NewTopic(self._output_topic, num_partitions=3, replication_factor=3)
Copy link
Copy Markdown
Contributor

@chimosky chimosky Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only other place num_partitions and replication_factor are mentioned are in test-data producer, they're not set in any environment so this should be okay, except you're thinking of setting it as an environment variable.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea this would need to be an environment variable for sure. like emelia's comment though, this is kinda an infra thing that might not belong in here and i might just end up removing it.

Comment on lines +77 to +78
if item is _SENTINEL:
break
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for my curiosity, why did you do this?

import threading
from typing import Any, Optional

import gevent._threading as _real_threading
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why you did this rather than do what you did with _real_sleep, your commit message didn't say why and it's not obvious.

@chimosky
Copy link
Copy Markdown
Contributor

I also think #221 should be included here, would make testing easier and also seeing as its absence didn't break anything on main but testing this without it is impossible.

@chimosky
Copy link
Copy Markdown
Contributor

Tested, and I still notice the same thing reported in #213, I'll test again later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Replace kafka-python with confluent-kafka

3 participants