diff --git a/Cargo.lock b/Cargo.lock index dcdc707f..3c5cd314 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1974,6 +1974,7 @@ dependencies = [ "serde_yaml", "serial_test", "sha2 0.10.9", + "socket2 0.5.10", "sqlx", "ssh-key", "sysinfo", diff --git a/crates/dragonfly-server/Cargo.toml b/crates/dragonfly-server/Cargo.toml index c81d4244..e40ae4b8 100644 --- a/crates/dragonfly-server/Cargo.toml +++ b/crates/dragonfly-server/Cargo.toml @@ -37,6 +37,7 @@ uuid = { version = "1.8.0", features = ["v4", "v5", "v7", "serde"] } tower = { version = "0.4", features = ["util"] } tower-http = { version = "0.5.0", features = ["fs", "trace", "cors"] } tokio = { version = "1.36.0", features = ["full"] } +socket2 = { version = "0.5", features = ["all"] } tokio-stream = { version = "0.1", features = ["sync"] } futures = "0.3" serde = { version = "1.0", features = ["derive"] } @@ -154,9 +155,6 @@ minijinja-embed = "2.3.0" [dev-dependencies] tempfile = "3.8" -# tokio test-util: paused-clock (`start_paused`) + `time::advance` for testing -# stall/timeout behaviour without real sleeps. -tokio = { version = "1.36.0", features = ["test-util", "macros", "rt-multi-thread", "time"] } serial_test = "3.0" # WebSocket client for integration tests (agent push + events stream). tokio-tungstenite = { version = "0.27" } \ No newline at end of file diff --git a/crates/dragonfly-server/src/api.rs b/crates/dragonfly-server/src/api.rs index 936e9c09..e439e217 100644 --- a/crates/dragonfly-server/src/api.rs +++ b/crates/dragonfly-server/src/api.rs @@ -249,6 +249,11 @@ pub fn api_router() -> Router { Router::new() .route("/machines", get(get_all_machines).post(register_machine)) .route("/machines/admin/create", post(admin_create_machine)) + // Look up a single machine by MAC — the right primitive for "is this MAC + // registered, and is it Installed?" in one call, instead of paginating + // the full machine list to find one row (which silently misses any + // machine past page 1). + .route("/machines/by-mac/{mac}", get(get_machine_by_mac)) .route("/machines/install-status", get(get_install_status)) .route("/machines/{id}/os", get(get_machine_os).post(assign_os)) .route("/machines/{id}/reimage", post(reimage_machine)) // Add new reimage endpoint @@ -1205,6 +1210,50 @@ async fn get_machine(State(state): State, Path(id): Path) -> Res } } +/// Look up a single machine by MAC address. +/// +/// Returns the `simple` detail projection (`id`, `hostname`, `ip_address`, +/// `mac_address`, `status`, `tags`) — the same shape `GET /machines` emits per +/// row — or `404` when no machine has that MAC. +/// +/// This is the primitive automation wants for "is this MAC registered, and is +/// it Installed?": one row, one call. The alternative callers used to reach for +/// — `GET /machines` and scanning for the MAC — paginates, so any machine past +/// page 1 looks absent and gets spuriously re-imaged on every converge run. +#[axum::debug_handler] +async fn get_machine_by_mac( + State(state): State, + _caller: AuthenticatedCaller, // authenticated read (same posture as the machine list) + Path(mac): Path, +) -> Response { + let normalized = dragonfly_common::normalize_mac(&mac); + match state.store.get_machine_by_mac(&normalized).await { + Ok(Some(v1_machine)) => { + let machine = machine_to_common(&v1_machine); + ( + StatusCode::OK, + Json(machine_to_detail_level(&machine, "simple")), + ) + .into_response() + } + Ok(None) => { + let error_response = ErrorResponse { + error: "Not Found".to_string(), + message: format!("Machine with MAC {} not found", mac), + }; + (StatusCode::NOT_FOUND, Json(error_response)).into_response() + } + Err(e) => { + error!("Failed to retrieve machine by MAC {}: {}", mac, e); + let error_response = ErrorResponse { + error: "Database Error".to_string(), + message: e.to_string(), + }; + (StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response() + } + } +} + // Combined OS assignment handler #[axum::debug_handler] async fn assign_os( @@ -2648,51 +2697,6 @@ async fn stream_artifact( } } -/// Maximum time to wait for a client to drain a streamed chunk before aborting -/// the transfer. A client that stops reading entirely (network blip, half-open -/// socket, slowloris) would otherwise pin the streaming task, its open file -/// handle, and buffered memory until the OS TCP timeout (~hours). With this, a -/// fully-stalled transfer is closed so the client (e.g. iPXE) can retry instead -/// of hanging forever at a partial percentage. Slow-but-progressing clients -/// never trip it: `tx.send` only stalls when the client makes zero progress for -/// the whole window. -const STREAM_STALL_TIMEOUT: Duration = Duration::from_secs(10); - -/// Outcome of pushing one chunk to the streaming client. -#[derive(Debug)] -enum StreamSendOutcome { - /// Chunk accepted by the channel. - Sent, - /// Client disconnected (receiver dropped) — stop streaming. - ReceiverGone, - /// Client made no progress for `stall_timeout` — abort to free resources. - Stalled, -} - -/// Send one chunk, aborting if the client doesn't drain a slot within -/// `stall_timeout`. Centralises stall-vs-disconnect handling so every streaming -/// path (full-file and range) fails fast on a stuck / slowloris client instead -/// of holding the transfer open until the OS TCP timeout. -async fn send_chunk( - tx: &mpsc::Sender>, - chunk: Bytes, - stall_timeout: Duration, - path: &StdPath, -) -> StreamSendOutcome { - match tokio::time::timeout(stall_timeout, tx.send(Ok(chunk))).await { - Ok(Ok(())) => StreamSendOutcome::Sent, - Ok(Err(_)) => StreamSendOutcome::ReceiverGone, - Err(_) => { - warn!( - path = %path.display(), - "Client stalled streaming artifact (no progress in {:?}); aborting transfer", - stall_timeout - ); - StreamSendOutcome::Stalled - } - } -} - async fn read_file_as_stream( path: &StdPath, range_header: Option<&HeaderValue>, // Add parameter for Range header @@ -2816,9 +2820,14 @@ async fn read_file_as_stream( } } - // Send the complete range as a single chunk (abort on stall/drop) - let _ = - send_chunk(&tx, Bytes::from(buffer), STREAM_STALL_TIMEOUT, &path_buf).await; + // Send the complete range as a single chunk. tx.send paces to the + // client's read rate — a slow client drains slowly; do NOT abort. + if tx.send(Ok(Bytes::from(buffer))).await.is_err() { + warn!( + "Client stream receiver dropped for file {} while sending range", + path_buf.display() + ); + } // Task finishes, tx is dropped, stream closes. } Err(e) => { @@ -2884,9 +2893,18 @@ async fn read_file_as_stream( } // else: Skipping progress track because total_size is 0 (logged elsewhere if needed) } // else: Skipping progress track because machine_id or state is missing - match send_chunk(&tx, chunk, STREAM_STALL_TIMEOUT, &path_buf).await { - StreamSendOutcome::Sent => {} - StreamSendOutcome::ReceiverGone | StreamSendOutcome::Stalled => break, + // Plain backpressure: tx.send paces the stream to the client's + // read rate. A slow client (e.g. BIOS iPXE on a contended box) + // drains at its own speed and finishes — do NOT abort on a slow + // drain, that turns a healthy-but-slow transfer into an infinite + // restart loop. Dead-peer cleanup is TCP keepalive's job, not a + // progress timer. + if tx.send(Ok(chunk)).await.is_err() { + warn!( + "Client stream receiver dropped for file {}", + path_buf.display() + ); + break; // receiver gone (client disconnected) } } Err(e) => { @@ -10502,73 +10520,6 @@ mod stream_artifact_tests { (0..200_000u32).map(|i| (i % 256) as u8).collect() } - #[tokio::test] - async fn send_chunk_succeeds_when_receiver_drains() { - let (tx, mut rx) = mpsc::channel::>(8); - let outcome = send_chunk( - &tx, - Bytes::from_static(b"hello"), - Duration::from_secs(1), - StdPath::new("/t"), - ) - .await; - assert!(matches!(outcome, StreamSendOutcome::Sent)); - let chunk = rx - .recv() - .await - .expect("receiver gets chunk") - .expect("ok chunk"); - assert_eq!(chunk.as_ref(), &b"hello"[..]); - } - - #[tokio::test] - async fn send_chunk_detects_receiver_drop() { - let (tx, rx) = mpsc::channel::>(8); - drop(rx); // client disconnected - let outcome = send_chunk( - &tx, - Bytes::from_static(b"hello"), - Duration::from_secs(1), - StdPath::new("/t"), - ) - .await; - assert!(matches!(outcome, StreamSendOutcome::ReceiverGone)); - } - - /// A client that stops reading entirely (k8s03 stuck at 57%, slowloris) - /// must trip the stall timeout so the transfer is aborted and the task + - /// file handle are freed, instead of hanging until the OS TCP timeout. - #[tokio::test(start_paused = true)] - async fn send_chunk_aborts_when_client_stalls() { - // capacity 1, receiver never polled: after the first send fills the slot, - // the second send has nowhere to go and waits for a drain that never comes. - let (tx, _rx) = mpsc::channel::>(1); - tx.send(Ok(Bytes::from_static(b"x"))) - .await - .expect("fill channel slot"); - - let fut = send_chunk( - &tx, - Bytes::from_static(b"y"), - Duration::from_millis(50), - StdPath::new("/t"), - ); - tokio::pin!(fut); - - // Drive the paused clock forward until the 50ms stall timeout fires. - let outcome = loop { - tokio::time::advance(Duration::from_millis(20)).await; - if let std::task::Poll::Ready(o) = futures::poll!(&mut fut) { - break o; - } - }; - assert!( - matches!(outcome, StreamSendOutcome::Stalled), - "expected Stalled, got {:?}", - outcome - ); - } - #[tokio::test] async fn full_file_serves_200_with_headers_and_body() { let data = two_hundred_kb(); diff --git a/crates/dragonfly-server/src/lib.rs b/crates/dragonfly-server/src/lib.rs index 3936574a..d8caa78b 100644 --- a/crates/dragonfly-server/src/lib.rs +++ b/crates/dragonfly-server/src/lib.rs @@ -1529,6 +1529,29 @@ pub async fn run() -> anyhow::Result<()> { }); }; + // TCP keepalive + NODELAY on every accepted connection. + // + // Keepalive is the *correct* dead-peer detector: it probes the socket and + // only closes it when the peer is truly unresponsive (no ACKs). A slow + // client (e.g. BIOS iPXE on a contended box) still ACKs, so it is never + // harmed — unlike a progress-based stall timeout, which cannot tell "slow" + // from "dead" and aborts healthy-but-slow transfers mid-download. Stalled + // in-flight streams are reaped by TCP retransmission; idle-vanished peers + // are reaped here (~60s idle + a few probes), not after the OS default of + // hours. + use axum::serve::ListenerExt; + let listener = listener.tap_io(|stream| { + let keepalive = socket2::TcpKeepalive::new() + .with_time(std::time::Duration::from_secs(60)) + .with_interval(std::time::Duration::from_secs(15)); + if let Err(e) = socket2::SockRef::from(&*stream).set_tcp_keepalive(&keepalive) { + debug!(error = %e, "failed to set TCP keepalive on incoming connection"); + } + if let Err(e) = stream.set_nodelay(true) { + debug!(error = %e, "failed to set TCP_NODELAY on incoming connection"); + } + }); + axum::serve( listener, app.into_make_service_with_connect_info::(),