Skip to content

Commit

Permalink
Issue 361 - Improve http2 connection error handling
Browse files Browse the repository at this point in the history
- Partial fix for hatoo#361
- ONLY implemented for the `-z 10s` (work_until) case
- TODO:
   - [ ] The futures are not aborted when the timer is hit, which will cause long running requests to delay the program exit - this is only due to a borrow/move problem that I cannot figure out
   - [ ] Implement for the non-`work_until` cases
   - [ ] Add a timeout to the TCP socket setup - this appears to be where some of the delay on shutdown is happening if the server closes after startup
   - [ ] Consider adding a delay to the reconnect loop so that it will not try to connect more than 1 time per second per concurrent connection - Without this the connect loop will spin at ~23k connect attempts/second for `-c 20`, for example
- Test cases:
  - Start with the server not running at all (never connects)
    - Currently this will exit on time
    - IMPROVED: Previously this would attempt to connect once for each `-c`, fail, and immediately exit
    - IMPROVED: Currently this will repeatedly try to connect until the specified timeout expires, then it will exit
  - Start with the server running and leave it running
    - This works fine as before
  - Start with the server running, exit the server, then restart the server before the test completes
     - This initially makes requests
     - IMPROVED: Previously this would OOM even if the server restarted
     - IMPROVED: Currently this will reconnect and continue making requests if the server restarts
  • Loading branch information
huntharo committed Jan 5, 2024
1 parent 16d1df8 commit 5cbb862
Showing 1 changed file with 90 additions and 25 deletions.
115 changes: 90 additions & 25 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ impl Client {
url: &Url,
) -> Result<Stream, ClientError> {
if url.scheme() == "https" {
// TODO: This may be where some of the connection loops are hanging when
// the server closes; we may need to add a timeout here so that we can exit
// more closely to the configured timeout
self.tls_client(addr, url).await
} else if let Some(socket_path) = &self.unix_socket {
Ok(Stream::Unix(
Expand Down Expand Up @@ -423,6 +426,8 @@ impl Client {
.await
.is_err()
{
// This gets hit when the connection for HTTP/1.1 faults
// This re-connects
start = std::time::Instant::now();
let addr = self.dns.lookup(&url, &mut client_state.rng).await?;
let dns_lookup = std::time::Instant::now();
Expand Down Expand Up @@ -698,6 +703,20 @@ fn is_too_many_open_files(res: &Result<RequestResult, ClientError>) -> bool {
.unwrap_or(false)
}

/// Check error was any Hyper error (primarily for HTTP2 connection errors)
fn is_hyper_error(res: &Result<RequestResult, ClientError>) -> bool {
res.as_ref()
.err()
.map(|err| match err {
// REVIEW: IoErrors, if indicating the underlying connection has failed,
// should also cause a stop of HTTP2 requests
// ClientError::IoError(_) => true,
ClientError::HyperError(_) => true,
_ => false,
})
.unwrap_or(false)
}

async fn setup_http2(client: &Client) -> Result<(ConnectionTime, ClientStateHttp2), ClientError> {
let mut rng = StdRng::from_entropy();
let url = client.url_generator.generate(&mut rng)?;
Expand Down Expand Up @@ -1041,42 +1060,87 @@ pub async fn work_until(
n_connections: usize,
n_http2_parallel: usize,
) {
use std::sync::atomic::{AtomicBool, Ordering};
let client = Arc::new(client);
if client.is_http2() {
let should_exit = Arc::new(AtomicBool::new(false));
let futures = (0..n_connections)
.map(|_| {
let client = client.clone();
let report_tx = report_tx.clone();
let should_exit = Arc::clone(&should_exit);
tokio::spawn(async move {
match setup_http2(&client).await {
Ok((connection_time, client_state)) => {
let futures = (0..n_http2_parallel)
.map(|_| {
let client = client.clone();
let report_tx = report_tx.clone();
let mut client_state = client_state.clone();
tokio::spawn(async move {
loop {
let mut res =
client.work_http2(&mut client_state).await;
let is_cancel = is_too_many_open_files(&res);
set_connection_time(&mut res, connection_time);
report_tx.send_async(res).await.unwrap();
if is_cancel {
break;
// Keep trying to establish or re-establish connections up to the deadline
loop {
// Stop if the deadline has passed
// This happens when a connection is never able to be established
// since the response handling / timeout logic never gets a chance to run
if std::time::Instant::now() > dead_line.into() {
break;
}
match setup_http2(&client).await {
Ok((connection_time, client_state)) => {
// Setup the parallel workers for each HTTP2 connection
loop {
if std::time::Instant::now() > dead_line.into() {
break;
}
let futures = (0..n_http2_parallel)
.map(|_| {
let client = client.clone();
let report_tx = report_tx.clone();
let mut client_state = client_state.clone();
let should_exit = Arc::clone(&should_exit);
tokio::spawn(async move {
// This is where HTTP2 loops to make all the requests for a given client and worker
loop {
let mut res =
client.work_http2(&mut client_state).await;
let is_cancel = is_too_many_open_files(&res);
let is_hyper_error = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
report_tx.send_async(res).await.unwrap();
if is_cancel || is_hyper_error || should_exit.load(Ordering::Relaxed) {
break;
}
}
})
})
.collect::<Vec<_>>();

tokio::select! {
result = futures::future::try_join_all(futures) => {
match result {
Ok(_) => {
// All tasks completed successfully
break;
}
Err(_) => {
// Re-establish the connection and restart the tasks
should_exit.store(true, Ordering::Relaxed);
break;
}
}
}
})
})
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
for f in futures {
f.abort();
_ = tokio::time::sleep_until(dead_line.into()) => {
// TODO: Ideally we would abort all the futures here
// This would be particurlarly important for cases where an tested request
// takes, say, 30 seconds to complete on avg. In that case it will be really
// obvious that the test does not exit at any specific configured time, but
// rather up to 30 seconds after that time.
// I cannot figure out how to abort the futures without triggering borrow/move errors
// for f in futures {
// f.abort();
// }
should_exit.store(true, Ordering::Relaxed);
break;
}
}
}
}
}

Err(err) => report_tx.send_async(Err(err)).await.unwrap(),
Err(err) => report_tx.send_async(Err(err)).await.unwrap(),
}
}
})
})
Expand All @@ -1092,6 +1156,7 @@ pub async fn work_until(
let mut client_state = ClientStateHttp1::default();
tokio::spawn(async move {
loop {
// This is where HTTP1 loops to make all the requests for a given client
let res = client.work_http1(&mut client_state).await;
let is_cancel = is_too_many_open_files(&res);
report_tx.send_async(res).await.unwrap();
Expand Down

0 comments on commit 5cbb862

Please sign in to comment.