Skip to content

Commit

Permalink
Capture errors from connecting to broker
Browse files Browse the repository at this point in the history
  • Loading branch information
einarmo committed Jan 23, 2024
1 parent 1eba7e3 commit 9b70a52
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
25 changes: 21 additions & 4 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use rand::prelude::*;
use std::fmt::Display;
use std::ops::ControlFlow;
use std::sync::Arc;
use thiserror::Error;
Expand Down Expand Up @@ -53,6 +54,23 @@ pub enum Error {

pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, Error)]
pub struct MultiError(Vec<Box<dyn std::error::Error + Send + Sync>>);

impl Display for MultiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut needs_comma = false;
for err in &self.0 {
if needs_comma {
write!(f, ", ")?;
}
needs_comma = true;
write!(f, "{err}")?;
}
Ok(())
}
}

/// How to connect to a `Transport`
#[async_trait]
trait ConnectionHandler {
Expand Down Expand Up @@ -470,6 +488,7 @@ where
let mut backoff = Backoff::new(backoff_config);
backoff
.retry_with_backoff("broker_connect", || async {
let mut errors = Vec::<Box<dyn std::error::Error + Send + Sync>>::new();
for broker in &brokers {
let conn = broker
.connect(
Expand All @@ -485,16 +504,14 @@ where
Ok(transport) => transport,
Err(e) => {
warn!(%e, "Failed to connect to broker");
errors.push(Box::new(e));
continue;
}
};

return ControlFlow::Break(connection);
}

let err = Box::<dyn std::error::Error + Send + Sync>::from(
"Failed to connect to any broker, backing off".to_string(),
);
let err = Box::<dyn std::error::Error + Send + Sync>::from(MultiError(errors));
let err: Arc<dyn std::error::Error + Send + Sync> = err.into();
ControlFlow::Continue(ErrorOrThrottle::Error(err))
})
Expand Down
7 changes: 6 additions & 1 deletion tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,12 @@ async fn test_client_backoff_terminates() {

match client_builder.build().await {
Err(rskafka::client::error::Error::Connection(e)) => {
assert_eq!(e.to_string(), "all retries failed: Retry exceeded deadline");
// Error can be slightly different depending on the exact underlying error.
assert!(e.to_string().starts_with(concat!(
"all retries failed: Retry exceeded deadline. ",
"Source: Failed to connect to any broker, backing off. ",
"Errors: error connecting to broker \"localhost:9000\""
)));
}
_ => {
unreachable!();
Expand Down

0 comments on commit 9b70a52

Please sign in to comment.