Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: wrap fetch_watermarks in spawn_blocking #196

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## rdkafka [0.3.3] - 2024-02-28

### Changed

- Wrap `fetch_watermarks` in `tokio::task::spawn_blocking`.

## rdkafka [0.3.2] - 2024-02-28

### Changed
Expand Down
2 changes: 1 addition & 1 deletion madsim-rdkafka/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "madsim-rdkafka"
version = "0.3.2+0.34.0"
version = "0.3.3+0.34.0"
edition = "2021"
authors = ["Runji Wang <[email protected]>"]
description = "The rdkafka simulator on madsim."
Expand Down
6 changes: 4 additions & 2 deletions madsim-rdkafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ The following functions are modified to be `async`:
- `ClientConfig::create`
- `ClientConfig::create_with_context`
- `Client::fetch_metadata`
- `Client::fetch_watermarks`
- `Client::fetch_watermarks`[^1]
- `Client::fetch_group_list`
- `Consumer::seek`
- `Consumer::seek_partitions`
Expand All @@ -37,14 +37,16 @@ The following functions are modified to be `async`:
- `Consumer::offsets_for_timestamp`
- `Consumer::offsets_for_times`
- `Consumer::fetch_metadata`
- `Consumer::fetch_watermarks`
- `Consumer::fetch_watermarks`[^1]
- `Consumer::fetch_group_list`
- `Producer::flush`
- `Producer::init_transactions`
- `Producer::send_offsets_to_transaction`
- `Producer::commit_transaction`
- `Producer::abort_transaction`

[^1]: wrapped in `tokio::task::spawn_blocking`

## DNS Resolution

This crate has cherry-picked [a commit] from Materialize to support rewriting broker addresses.
Expand Down
27 changes: 15 additions & 12 deletions madsim-rdkafka/src/std/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,29 +354,32 @@ impl<C: ClientContext> Client<C> {
}

/// Returns high and low watermark for the specified topic and partition.
pub async fn fetch_watermarks<T: Into<Timeout>>(
pub async fn fetch_watermarks<T: Into<Timeout> + Send + 'static>(
&self,
topic: &str,
partition: i32,
timeout: T,
) -> KafkaResult<(i64, i64)> {
let mut low = -1;
let mut high = -1;
let topic_c = CString::new(topic.to_string())?;
let ret = unsafe {
rdsys::rd_kafka_query_watermark_offsets(
self.native_ptr(),
let native_client = unsafe { NativeClient::from_ptr(self.native_ptr()) };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NativeClient will call rd_kafka_destroy when it's get dropped (behavior of NativePtr). Will this lead to problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohhh. I think so. Thanks for your troubleshooting!

tokio::task::spawn_blocking(move || unsafe {
let mut low = -1;
let mut high = -1;
let ret = rdsys::rd_kafka_query_watermark_offsets(
native_client.ptr(),
topic_c.as_ptr(),
partition,
&mut low as *mut i64,
&mut high as *mut i64,
timeout.into().as_millis(),
)
};
if ret.is_error() {
return Err(KafkaError::MetadataFetch(ret.into()));
}
Ok((low, high))
);
if ret.is_error() {
return Err(KafkaError::MetadataFetch(ret.into()));
}
Ok((low, high))
})
.await
.unwrap()
}

/// Returns the cluster identifier option or None if the cluster identifier is null
Expand Down
2 changes: 1 addition & 1 deletion madsim-rdkafka/src/std/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ where
self.client.fetch_metadata(topic, timeout).await
}

async fn fetch_watermarks<T: Into<Timeout> + Send>(
async fn fetch_watermarks<T: Into<Timeout> + Send + 'static>(
&self,
topic: &str,
partition: i32,
Expand Down
2 changes: 1 addition & 1 deletion madsim-rdkafka/src/std/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ where
timeout: T,
) -> KafkaResult<(i64, i64)>
where
T: Into<Timeout> + Send,
T: Into<Timeout> + Send + 'static,
Self: Sized;

/// Returns the group membership information for the given group. If no group is
Expand Down
2 changes: 1 addition & 1 deletion madsim-rdkafka/src/std/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ where
timeout: T,
) -> KafkaResult<(i64, i64)>
where
T: Into<Timeout> + Send,
T: Into<Timeout> + Send + 'static,
Self: Sized,
{
self.base.fetch_watermarks(topic, partition, timeout).await
Expand Down
Loading