Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions crates/dragonfly-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ uuid = { version = "1.8.0", features = ["v4", "v5", "v7", "serde"] }
tower = { version = "0.4", features = ["util"] }
tower-http = { version = "0.5.0", features = ["fs", "trace", "cors"] }
tokio = { version = "1.36.0", features = ["full"] }
socket2 = { version = "0.5", features = ["all"] }
tokio-stream = { version = "0.1", features = ["sync"] }
futures = "0.3"
serde = { version = "1.0", features = ["derive"] }
Expand Down Expand Up @@ -154,9 +155,6 @@ minijinja-embed = "2.3.0"

[dev-dependencies]
tempfile = "3.8"
# tokio test-util: paused-clock (`start_paused`) + `time::advance` for testing
# stall/timeout behaviour without real sleeps.
tokio = { version = "1.36.0", features = ["test-util", "macros", "rt-multi-thread", "time"] }
serial_test = "3.0"
# WebSocket client for integration tests (agent push + events stream).
tokio-tungstenite = { version = "0.27" }
187 changes: 69 additions & 118 deletions crates/dragonfly-server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ pub fn api_router() -> Router<crate::AppState> {
Router::new()
.route("/machines", get(get_all_machines).post(register_machine))
.route("/machines/admin/create", post(admin_create_machine))
// Look up a single machine by MAC — the right primitive for "is this MAC
// registered, and is it Installed?" in one call, instead of paginating
// the full machine list to find one row (which silently misses any
// machine past page 1).
.route("/machines/by-mac/{mac}", get(get_machine_by_mac))
.route("/machines/install-status", get(get_install_status))
.route("/machines/{id}/os", get(get_machine_os).post(assign_os))
.route("/machines/{id}/reimage", post(reimage_machine)) // Add new reimage endpoint
Expand Down Expand Up @@ -1205,6 +1210,50 @@ async fn get_machine(State(state): State<AppState>, Path(id): Path<Uuid>) -> Res
}
}

/// Look up a single machine by MAC address.
///
/// Returns the `simple` detail projection (`id`, `hostname`, `ip_address`,
/// `mac_address`, `status`, `tags`) — the same shape `GET /machines` emits per
/// row — or `404` when no machine has that MAC.
///
/// This is the primitive automation wants for "is this MAC registered, and is
/// it Installed?": one row, one call. The alternative callers used to reach for
/// — `GET /machines` and scanning for the MAC — paginates, so any machine past
/// page 1 looks absent and gets spuriously re-imaged on every converge run.
#[axum::debug_handler]
async fn get_machine_by_mac(
State(state): State<AppState>,
_caller: AuthenticatedCaller, // authenticated read (same posture as the machine list)
Path(mac): Path<String>,
) -> Response {
let normalized = dragonfly_common::normalize_mac(&mac);
match state.store.get_machine_by_mac(&normalized).await {
Ok(Some(v1_machine)) => {
let machine = machine_to_common(&v1_machine);
(
StatusCode::OK,
Json(machine_to_detail_level(&machine, "simple")),
)
.into_response()
}
Ok(None) => {
let error_response = ErrorResponse {
error: "Not Found".to_string(),
message: format!("Machine with MAC {} not found", mac),
};
(StatusCode::NOT_FOUND, Json(error_response)).into_response()
}
Err(e) => {
error!("Failed to retrieve machine by MAC {}: {}", mac, e);
let error_response = ErrorResponse {
error: "Database Error".to_string(),
message: e.to_string(),
};
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
}
}
}

// Combined OS assignment handler
#[axum::debug_handler]
async fn assign_os(
Expand Down Expand Up @@ -2648,51 +2697,6 @@ async fn stream_artifact(
}
}

/// Maximum time to wait for a client to drain a streamed chunk before aborting
/// the transfer. A client that stops reading entirely (network blip, half-open
/// socket, slowloris) would otherwise pin the streaming task, its open file
/// handle, and buffered memory until the OS TCP timeout (~hours). With this, a
/// fully-stalled transfer is closed so the client (e.g. iPXE) can retry instead
/// of hanging forever at a partial percentage. Slow-but-progressing clients
/// never trip it: `tx.send` only stalls when the client makes zero progress for
/// the whole window.
const STREAM_STALL_TIMEOUT: Duration = Duration::from_secs(10);

/// Outcome of pushing one chunk to the streaming client.
#[derive(Debug)]
enum StreamSendOutcome {
/// Chunk accepted by the channel.
Sent,
/// Client disconnected (receiver dropped) — stop streaming.
ReceiverGone,
/// Client made no progress for `stall_timeout` — abort to free resources.
Stalled,
}

/// Send one chunk, aborting if the client doesn't drain a slot within
/// `stall_timeout`. Centralises stall-vs-disconnect handling so every streaming
/// path (full-file and range) fails fast on a stuck / slowloris client instead
/// of holding the transfer open until the OS TCP timeout.
async fn send_chunk(
tx: &mpsc::Sender<Result<Bytes, Error>>,
chunk: Bytes,
stall_timeout: Duration,
path: &StdPath,
) -> StreamSendOutcome {
match tokio::time::timeout(stall_timeout, tx.send(Ok(chunk))).await {
Ok(Ok(())) => StreamSendOutcome::Sent,
Ok(Err(_)) => StreamSendOutcome::ReceiverGone,
Err(_) => {
warn!(
path = %path.display(),
"Client stalled streaming artifact (no progress in {:?}); aborting transfer",
stall_timeout
);
StreamSendOutcome::Stalled
}
}
}

async fn read_file_as_stream(
path: &StdPath,
range_header: Option<&HeaderValue>, // Add parameter for Range header
Expand Down Expand Up @@ -2816,9 +2820,14 @@ async fn read_file_as_stream(
}
}

// Send the complete range as a single chunk (abort on stall/drop)
let _ =
send_chunk(&tx, Bytes::from(buffer), STREAM_STALL_TIMEOUT, &path_buf).await;
// Send the complete range as a single chunk. tx.send paces to the
// client's read rate — a slow client drains slowly; do NOT abort.
if tx.send(Ok(Bytes::from(buffer))).await.is_err() {
warn!(
"Client stream receiver dropped for file {} while sending range",
path_buf.display()
);
}
// Task finishes, tx is dropped, stream closes.
}
Err(e) => {
Expand Down Expand Up @@ -2884,9 +2893,18 @@ async fn read_file_as_stream(
} // else: Skipping progress track because total_size is 0 (logged elsewhere if needed)
} // else: Skipping progress track because machine_id or state is missing

match send_chunk(&tx, chunk, STREAM_STALL_TIMEOUT, &path_buf).await {
StreamSendOutcome::Sent => {}
StreamSendOutcome::ReceiverGone | StreamSendOutcome::Stalled => break,
// Plain backpressure: tx.send paces the stream to the client's
// read rate. A slow client (e.g. BIOS iPXE on a contended box)
// drains at its own speed and finishes — do NOT abort on a slow
// drain, that turns a healthy-but-slow transfer into an infinite
// restart loop. Dead-peer cleanup is TCP keepalive's job, not a
// progress timer.
if tx.send(Ok(chunk)).await.is_err() {
warn!(
"Client stream receiver dropped for file {}",
path_buf.display()
);
break; // receiver gone (client disconnected)
}
}
Err(e) => {
Expand Down Expand Up @@ -10502,73 +10520,6 @@ mod stream_artifact_tests {
(0..200_000u32).map(|i| (i % 256) as u8).collect()
}

#[tokio::test]
async fn send_chunk_succeeds_when_receiver_drains() {
let (tx, mut rx) = mpsc::channel::<Result<Bytes, Error>>(8);
let outcome = send_chunk(
&tx,
Bytes::from_static(b"hello"),
Duration::from_secs(1),
StdPath::new("/t"),
)
.await;
assert!(matches!(outcome, StreamSendOutcome::Sent));
let chunk = rx
.recv()
.await
.expect("receiver gets chunk")
.expect("ok chunk");
assert_eq!(chunk.as_ref(), &b"hello"[..]);
}

#[tokio::test]
async fn send_chunk_detects_receiver_drop() {
let (tx, rx) = mpsc::channel::<Result<Bytes, Error>>(8);
drop(rx); // client disconnected
let outcome = send_chunk(
&tx,
Bytes::from_static(b"hello"),
Duration::from_secs(1),
StdPath::new("/t"),
)
.await;
assert!(matches!(outcome, StreamSendOutcome::ReceiverGone));
}

/// A client that stops reading entirely (k8s03 stuck at 57%, slowloris)
/// must trip the stall timeout so the transfer is aborted and the task +
/// file handle are freed, instead of hanging until the OS TCP timeout.
#[tokio::test(start_paused = true)]
async fn send_chunk_aborts_when_client_stalls() {
// capacity 1, receiver never polled: after the first send fills the slot,
// the second send has nowhere to go and waits for a drain that never comes.
let (tx, _rx) = mpsc::channel::<Result<Bytes, Error>>(1);
tx.send(Ok(Bytes::from_static(b"x")))
.await
.expect("fill channel slot");

let fut = send_chunk(
&tx,
Bytes::from_static(b"y"),
Duration::from_millis(50),
StdPath::new("/t"),
);
tokio::pin!(fut);

// Drive the paused clock forward until the 50ms stall timeout fires.
let outcome = loop {
tokio::time::advance(Duration::from_millis(20)).await;
if let std::task::Poll::Ready(o) = futures::poll!(&mut fut) {
break o;
}
};
assert!(
matches!(outcome, StreamSendOutcome::Stalled),
"expected Stalled, got {:?}",
outcome
);
}

#[tokio::test]
async fn full_file_serves_200_with_headers_and_body() {
let data = two_hundred_kb();
Expand Down
23 changes: 23 additions & 0 deletions crates/dragonfly-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,29 @@ pub async fn run() -> anyhow::Result<()> {
});
};

// TCP keepalive + NODELAY on every accepted connection.
//
// Keepalive is the *correct* dead-peer detector: it probes the socket and
// only closes it when the peer is truly unresponsive (no ACKs). A slow
// client (e.g. BIOS iPXE on a contended box) still ACKs, so it is never
// harmed — unlike a progress-based stall timeout, which cannot tell "slow"
// from "dead" and aborts healthy-but-slow transfers mid-download. Stalled
// in-flight streams are reaped by TCP retransmission; idle-vanished peers
// are reaped here (~60s idle + a few probes), not after the OS default of
// hours.
use axum::serve::ListenerExt;
let listener = listener.tap_io(|stream| {
let keepalive = socket2::TcpKeepalive::new()
.with_time(std::time::Duration::from_secs(60))
.with_interval(std::time::Duration::from_secs(15));
if let Err(e) = socket2::SockRef::from(&*stream).set_tcp_keepalive(&keepalive) {
debug!(error = %e, "failed to set TCP keepalive on incoming connection");
}
if let Err(e) = stream.set_nodelay(true) {
debug!(error = %e, "failed to set TCP_NODELAY on incoming connection");
}
});

axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
Expand Down
Loading