diff --git a/crates/dragonfly-server/Cargo.toml b/crates/dragonfly-server/Cargo.toml index 430c8856..c81d4244 100644 --- a/crates/dragonfly-server/Cargo.toml +++ b/crates/dragonfly-server/Cargo.toml @@ -154,6 +154,9 @@ minijinja-embed = "2.3.0" [dev-dependencies] tempfile = "3.8" +# tokio test-util: paused-clock (`start_paused`) + `time::advance` for testing +# stall/timeout behaviour without real sleeps. +tokio = { version = "1.36.0", features = ["test-util", "macros", "rt-multi-thread", "time"] } serial_test = "3.0" # WebSocket client for integration tests (agent push + events stream). tokio-tungstenite = { version = "0.27" } \ No newline at end of file diff --git a/crates/dragonfly-server/src/api.rs b/crates/dragonfly-server/src/api.rs index 6ed8aa7d..936e9c09 100644 --- a/crates/dragonfly-server/src/api.rs +++ b/crates/dragonfly-server/src/api.rs @@ -2648,6 +2648,51 @@ async fn stream_artifact( } } +/// Maximum time to wait for a client to drain a streamed chunk before aborting +/// the transfer. A client that stops reading entirely (network blip, half-open +/// socket, slowloris) would otherwise pin the streaming task, its open file +/// handle, and buffered memory until the OS TCP timeout (~hours). With this, a +/// fully-stalled transfer is closed so the client (e.g. iPXE) can retry instead +/// of hanging forever at a partial percentage. Slow-but-progressing clients +/// never trip it: `tx.send` only stalls when the client makes zero progress for +/// the whole window. +const STREAM_STALL_TIMEOUT: Duration = Duration::from_secs(10); + +/// Outcome of pushing one chunk to the streaming client. +#[derive(Debug)] +enum StreamSendOutcome { + /// Chunk accepted by the channel. + Sent, + /// Client disconnected (receiver dropped) — stop streaming. + ReceiverGone, + /// Client made no progress for `stall_timeout` — abort to free resources. + Stalled, +} + +/// Send one chunk, aborting if the client doesn't drain a slot within +/// `stall_timeout`. Centralises stall-vs-disconnect handling so every streaming +/// path (full-file and range) fails fast on a stuck / slowloris client instead +/// of holding the transfer open until the OS TCP timeout. +async fn send_chunk( + tx: &mpsc::Sender>, + chunk: Bytes, + stall_timeout: Duration, + path: &StdPath, +) -> StreamSendOutcome { + match tokio::time::timeout(stall_timeout, tx.send(Ok(chunk))).await { + Ok(Ok(())) => StreamSendOutcome::Sent, + Ok(Err(_)) => StreamSendOutcome::ReceiverGone, + Err(_) => { + warn!( + path = %path.display(), + "Client stalled streaming artifact (no progress in {:?}); aborting transfer", + stall_timeout + ); + StreamSendOutcome::Stalled + } + } +} + async fn read_file_as_stream( path: &StdPath, range_header: Option<&HeaderValue>, // Add parameter for Range header @@ -2771,13 +2816,9 @@ async fn read_file_as_stream( } } - // Send the complete range as a single chunk - if tx.send(Ok(Bytes::from(buffer))).await.is_err() { - warn!( - "Client stream receiver dropped for file {} while sending range", - path_buf.display() - ); - } + // Send the complete range as a single chunk (abort on stall/drop) + let _ = + send_chunk(&tx, Bytes::from(buffer), STREAM_STALL_TIMEOUT, &path_buf).await; // Task finishes, tx is dropped, stream closes. } Err(e) => { @@ -2843,12 +2884,9 @@ async fn read_file_as_stream( } // else: Skipping progress track because total_size is 0 (logged elsewhere if needed) } // else: Skipping progress track because machine_id or state is missing - if tx.send(Ok(chunk)).await.is_err() { - warn!( - "Client stream receiver dropped for file {}", - path_buf.display() - ); - break; // Exit loop if receiver is gone + match send_chunk(&tx, chunk, STREAM_STALL_TIMEOUT, &path_buf).await { + StreamSendOutcome::Sent => {} + StreamSendOutcome::ReceiverGone | StreamSendOutcome::Stalled => break, } } Err(e) => { @@ -10464,6 +10502,73 @@ mod stream_artifact_tests { (0..200_000u32).map(|i| (i % 256) as u8).collect() } + #[tokio::test] + async fn send_chunk_succeeds_when_receiver_drains() { + let (tx, mut rx) = mpsc::channel::>(8); + let outcome = send_chunk( + &tx, + Bytes::from_static(b"hello"), + Duration::from_secs(1), + StdPath::new("/t"), + ) + .await; + assert!(matches!(outcome, StreamSendOutcome::Sent)); + let chunk = rx + .recv() + .await + .expect("receiver gets chunk") + .expect("ok chunk"); + assert_eq!(chunk.as_ref(), &b"hello"[..]); + } + + #[tokio::test] + async fn send_chunk_detects_receiver_drop() { + let (tx, rx) = mpsc::channel::>(8); + drop(rx); // client disconnected + let outcome = send_chunk( + &tx, + Bytes::from_static(b"hello"), + Duration::from_secs(1), + StdPath::new("/t"), + ) + .await; + assert!(matches!(outcome, StreamSendOutcome::ReceiverGone)); + } + + /// A client that stops reading entirely (k8s03 stuck at 57%, slowloris) + /// must trip the stall timeout so the transfer is aborted and the task + + /// file handle are freed, instead of hanging until the OS TCP timeout. + #[tokio::test(start_paused = true)] + async fn send_chunk_aborts_when_client_stalls() { + // capacity 1, receiver never polled: after the first send fills the slot, + // the second send has nowhere to go and waits for a drain that never comes. + let (tx, _rx) = mpsc::channel::>(1); + tx.send(Ok(Bytes::from_static(b"x"))) + .await + .expect("fill channel slot"); + + let fut = send_chunk( + &tx, + Bytes::from_static(b"y"), + Duration::from_millis(50), + StdPath::new("/t"), + ); + tokio::pin!(fut); + + // Drive the paused clock forward until the 50ms stall timeout fires. + let outcome = loop { + tokio::time::advance(Duration::from_millis(20)).await; + if let std::task::Poll::Ready(o) = futures::poll!(&mut fut) { + break o; + } + }; + assert!( + matches!(outcome, StreamSendOutcome::Stalled), + "expected Stalled, got {:?}", + outcome + ); + } + #[tokio::test] async fn full_file_serves_200_with_headers_and_body() { let data = two_hundred_kb();