Skip to content

Commit

Permalink
feat: Capture errors from connecting to broker
Browse files Browse the repository at this point in the history
  • Loading branch information
einarmo committed Jan 23, 2024
1 parent bd45435 commit d121e81
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 5 deletions.
28 changes: 24 additions & 4 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use rand::prelude::*;
use std::fmt::Display;
use std::ops::ControlFlow;
use std::sync::Arc;
use thiserror::Error;
Expand Down Expand Up @@ -53,6 +54,26 @@ pub enum Error {

pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, Error)]
pub struct MultiError(Vec<Box<dyn std::error::Error + Send + Sync>>);

impl Display for MultiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut needs_comma = false;
if self.0.len() > 1 {
write!(f, "Multiple errors occured: ")?;
}
for err in &self.0 {
if needs_comma {
write!(f, ", ")?;
}
needs_comma = true;
write!(f, "{err}")?;
}
Ok(())
}
}

/// How to connect to a `Transport`
#[async_trait]
trait ConnectionHandler {
Expand Down Expand Up @@ -470,6 +491,7 @@ where
let mut backoff = Backoff::new(backoff_config);
backoff
.retry_with_backoff("broker_connect", || async {
let mut errors = Vec::<Box<dyn std::error::Error + Send + Sync>>::new();
for broker in &brokers {
let conn = broker
.connect(
Expand All @@ -485,16 +507,14 @@ where
Ok(transport) => transport,
Err(e) => {
warn!(%e, "Failed to connect to broker");
errors.push(Box::new(e));
continue;
}
};

return ControlFlow::Break(connection);
}

let err = Box::<dyn std::error::Error + Send + Sync>::from(
"Failed to connect to any broker, backing off".to_string(),
);
let err = Box::<dyn std::error::Error + Send + Sync>::from(MultiError(errors));
let err: Arc<dyn std::error::Error + Send + Sync> = err.into();
ControlFlow::Continue(ErrorOrThrottle::Error(err))
})
Expand Down
10 changes: 9 additions & 1 deletion tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,15 @@ async fn test_client_backoff_terminates() {

match client_builder.build().await {
Err(rskafka::client::error::Error::Connection(e)) => {
assert_eq!(e.to_string(), "all retries failed: Retry exceeded deadline");
// Error can be slightly different depending on the exact underlying error.
assert!(
e.to_string().starts_with(concat!(
"all retries failed: Retry exceeded deadline. ",
"Source: error connecting to broker \"localhost:9000\""
)),
"expected error to start with \"all retries failed...\", actual: {}",
e.to_string()
);
}
_ => {
unreachable!();
Expand Down

0 comments on commit d121e81

Please sign in to comment.