Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/dragonfly-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ minijinja-embed = "2.3.0"

[dev-dependencies]
tempfile = "3.8"
# tokio test-util: paused-clock (`start_paused`) + `time::advance` for testing
# stall/timeout behaviour without real sleeps.
tokio = { version = "1.36.0", features = ["test-util", "macros", "rt-multi-thread", "time"] }
serial_test = "3.0"
# WebSocket client for integration tests (agent push + events stream).
tokio-tungstenite = { version = "0.27" }
131 changes: 118 additions & 13 deletions crates/dragonfly-server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2648,6 +2648,51 @@ async fn stream_artifact(
}
}

/// Maximum time to wait for a client to drain a streamed chunk before aborting
/// the transfer. A client that stops reading entirely (network blip, half-open
/// socket, slowloris) would otherwise pin the streaming task, its open file
/// handle, and buffered memory until the OS TCP timeout (~hours). With this, a
/// fully-stalled transfer is closed so the client (e.g. iPXE) can retry instead
/// of hanging forever at a partial percentage. Slow-but-progressing clients
/// never trip it: `tx.send` only stalls when the client makes zero progress for
/// the whole window.
const STREAM_STALL_TIMEOUT: Duration = Duration::from_secs(10);

/// Outcome of pushing one chunk to the streaming client.
#[derive(Debug)]
enum StreamSendOutcome {
/// Chunk accepted by the channel.
Sent,
/// Client disconnected (receiver dropped) — stop streaming.
ReceiverGone,
/// Client made no progress for `stall_timeout` — abort to free resources.
Stalled,
}

/// Send one chunk, aborting if the client doesn't drain a slot within
/// `stall_timeout`. Centralises stall-vs-disconnect handling so every streaming
/// path (full-file and range) fails fast on a stuck / slowloris client instead
/// of holding the transfer open until the OS TCP timeout.
async fn send_chunk(
tx: &mpsc::Sender<Result<Bytes, Error>>,
chunk: Bytes,
stall_timeout: Duration,
path: &StdPath,
) -> StreamSendOutcome {
match tokio::time::timeout(stall_timeout, tx.send(Ok(chunk))).await {
Ok(Ok(())) => StreamSendOutcome::Sent,
Ok(Err(_)) => StreamSendOutcome::ReceiverGone,
Err(_) => {
warn!(
path = %path.display(),
"Client stalled streaming artifact (no progress in {:?}); aborting transfer",
stall_timeout
);
StreamSendOutcome::Stalled
}
}
}

async fn read_file_as_stream(
path: &StdPath,
range_header: Option<&HeaderValue>, // Add parameter for Range header
Expand Down Expand Up @@ -2771,13 +2816,9 @@ async fn read_file_as_stream(
}
}

// Send the complete range as a single chunk
if tx.send(Ok(Bytes::from(buffer))).await.is_err() {
warn!(
"Client stream receiver dropped for file {} while sending range",
path_buf.display()
);
}
// Send the complete range as a single chunk (abort on stall/drop)
let _ =
send_chunk(&tx, Bytes::from(buffer), STREAM_STALL_TIMEOUT, &path_buf).await;
// Task finishes, tx is dropped, stream closes.
}
Err(e) => {
Expand Down Expand Up @@ -2843,12 +2884,9 @@ async fn read_file_as_stream(
} // else: Skipping progress track because total_size is 0 (logged elsewhere if needed)
} // else: Skipping progress track because machine_id or state is missing

if tx.send(Ok(chunk)).await.is_err() {
warn!(
"Client stream receiver dropped for file {}",
path_buf.display()
);
break; // Exit loop if receiver is gone
match send_chunk(&tx, chunk, STREAM_STALL_TIMEOUT, &path_buf).await {
StreamSendOutcome::Sent => {}
StreamSendOutcome::ReceiverGone | StreamSendOutcome::Stalled => break,
}
}
Err(e) => {
Expand Down Expand Up @@ -10464,6 +10502,73 @@ mod stream_artifact_tests {
(0..200_000u32).map(|i| (i % 256) as u8).collect()
}

#[tokio::test]
async fn send_chunk_succeeds_when_receiver_drains() {
let (tx, mut rx) = mpsc::channel::<Result<Bytes, Error>>(8);
let outcome = send_chunk(
&tx,
Bytes::from_static(b"hello"),
Duration::from_secs(1),
StdPath::new("/t"),
)
.await;
assert!(matches!(outcome, StreamSendOutcome::Sent));
let chunk = rx
.recv()
.await
.expect("receiver gets chunk")
.expect("ok chunk");
assert_eq!(chunk.as_ref(), &b"hello"[..]);
}

#[tokio::test]
async fn send_chunk_detects_receiver_drop() {
let (tx, rx) = mpsc::channel::<Result<Bytes, Error>>(8);
drop(rx); // client disconnected
let outcome = send_chunk(
&tx,
Bytes::from_static(b"hello"),
Duration::from_secs(1),
StdPath::new("/t"),
)
.await;
assert!(matches!(outcome, StreamSendOutcome::ReceiverGone));
}

/// A client that stops reading entirely (k8s03 stuck at 57%, slowloris)
/// must trip the stall timeout so the transfer is aborted and the task +
/// file handle are freed, instead of hanging until the OS TCP timeout.
#[tokio::test(start_paused = true)]
async fn send_chunk_aborts_when_client_stalls() {
// capacity 1, receiver never polled: after the first send fills the slot,
// the second send has nowhere to go and waits for a drain that never comes.
let (tx, _rx) = mpsc::channel::<Result<Bytes, Error>>(1);
tx.send(Ok(Bytes::from_static(b"x")))
.await
.expect("fill channel slot");

let fut = send_chunk(
&tx,
Bytes::from_static(b"y"),
Duration::from_millis(50),
StdPath::new("/t"),
);
tokio::pin!(fut);

// Drive the paused clock forward until the 50ms stall timeout fires.
let outcome = loop {
tokio::time::advance(Duration::from_millis(20)).await;
if let std::task::Poll::Ready(o) = futures::poll!(&mut fut) {
break o;
}
};
assert!(
matches!(outcome, StreamSendOutcome::Stalled),
"expected Stalled, got {:?}",
outcome
);
}

#[tokio::test]
async fn full_file_serves_200_with_headers_and_body() {
let data = two_hundred_kb();
Expand Down
Loading