diff --git a/.gitignore b/.gitignore index 9280984..456ad49 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ target/ __pycache__/ +.worktrees/ diff --git a/Cargo.lock b/Cargo.lock index 98211e8..e55213b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -319,6 +319,7 @@ dependencies = [ "sha2", "tempfile", "thiserror 1.0.69", + "time", "tokio", "tower", "tower-http", diff --git a/Cargo.toml b/Cargo.toml index e95dc6b..a2b251f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ anyhow = "1" thiserror = "1" async-trait = "0.1" rand = "0.8" +time = { version = "0.3", features = ["formatting"] } # Temp files for thumbnail extraction tempfile = "3" diff --git a/README.md b/README.md index 851ea00..afd502e 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,12 @@ Rust upload data plane for Divine Blossom media uploads. It does not own the Blossom control plane. `divine-blossom` remains the Fastly-facing service that answers client control-plane requests such as `HEAD /upload`, validates Blossom auth, and proxies short `init` and `complete` calls to this service. +In the approved Divine resumable flow: + +- `https://media.divine.video` is the client-facing control plane +- `https://upload.divine.video` is the opaque resumable session data plane +- `uploadUrl` values returned to clients are server-issued session URLs and must be treated as opaque + ## Runtime Configuration The service reads configuration from environment variables: @@ -27,6 +33,9 @@ The service reads configuration from environment variables: - `TRANSCRIBER_URL` - `RESUMABLE_SESSION_TTL_SECS` - `RESUMABLE_CHUNK_SIZE` +- `RESUMABLE_MAX_REQUEST_BODY_SIZE` + +`RESUMABLE_CHUNK_SIZE` is capped to `RESUMABLE_MAX_REQUEST_BODY_SIZE` before it is advertised in `/upload/init`, so the published chunk contract cannot exceed the actual request-body limit on `upload.divine.video`. The default route limit is `1048576` bytes, matching the current production nginx ingress behavior. `UPLOAD_ROUTE_MAX_BODY_SIZE` is also accepted as a compatibility alias. ## Development diff --git a/docs/superpowers/plans/2026-03-28-divine-upload-server-resumable-contract-alignment.md b/docs/superpowers/plans/2026-03-28-divine-upload-server-resumable-contract-alignment.md new file mode 100644 index 0000000..2deefc7 --- /dev/null +++ b/docs/superpowers/plans/2026-03-28-divine-upload-server-resumable-contract-alignment.md @@ -0,0 +1,371 @@ +# Divine Upload Server Resumable Contract Alignment Implementation Plan + +> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Align `divine-upload-server` with the approved Divine resumable upload contract so `divine-blossom` can proxy the public control plane without reshaping data-plane responses. + +**Architecture:** Keep this repo as the resumable data plane on `upload.divine.video` and leave client-facing `HEAD /upload`, `POST /upload/init`, and `POST /upload/{uploadId}/complete` ownership with `divine-blossom`. In this repo, align JSON field names, expiry formatting, session headers, and completion payload shape to the approved contract while preserving legacy single-shot upload behavior. + +**Tech Stack:** Rust, Axum, Serde, Tokio, Google Cloud Storage, Python unittest + +--- + +**Scope split:** This plan covers `divine-upload-server` only. `divine-blossom` still needs a separate implementation plan for the public control-plane routes on `media.divine.video`. + +**File Structure** + +- `src/resumable.rs` + - owns resumable request and response models, expiry formatting, session state, and completion metadata +- `src/main.rs` + - owns HTTP headers and handler responses for session `HEAD` and `PUT` +- `src/landing.html` + - owns the operator-facing endpoint description for the upload host +- `README.md` + - owns the repo boundary and deployment contract documentation +- `tests/test_export_video_upload_hashes.py` + - unchanged behavior safety net for Python tooling + +## Chunk 1: Contract Models And Expiry Format + +### Task 1: Make resumable init request and response JSON match the approved contract + +**Files:** +- Modify: `src/resumable.rs` +- Test: `src/resumable.rs` + +- [ ] **Step 1: Write the failing serialization tests** + +```rust +#[test] +fn init_contract_request_accepts_camel_case_fields() { + let payload = serde_json::json!({ + "sha256": "abc", + "size": 12, + "contentType": "video/mp4", + "fileName": "clip.mp4" + }); + + let request: ResumableUploadInitRequest = serde_json::from_value(payload) + .expect("camelCase init request should deserialize"); + + assert_eq!(request.content_type, "video/mp4"); + assert_eq!(request.file_name.as_deref(), Some("clip.mp4")); +} + +#[test] +fn init_contract_response_serializes_camel_case_fields() { + let response = ResumableUploadInitResponse { + upload_id: "up_123".to_string(), + upload_url: "https://upload.divine.video/sessions/up_123".to_string(), + expires_at: "2026-03-28T04:00:00Z".to_string(), + chunk_size: 8 * 1024 * 1024, + next_offset: 0, + required_headers: std::collections::HashMap::new(), + capabilities: ResumableCapabilities { + resume: true, + query_offset: true, + }, + }; + + let json = serde_json::to_value(response).expect("serialize response"); + + assert!(json.get("uploadId").is_some()); + assert!(json.get("uploadUrl").is_some()); + assert!(json.get("expiresAt").is_some()); + assert!(json.get("chunkSize").is_some()); + assert!(json.get("nextOffset").is_some()); + assert!(json.get("capabilities").is_some()); + assert!(json.get("upload_id").is_none()); +} +``` + +- [ ] **Step 2: Run the targeted Rust tests to verify they fail** + +Run: `cargo test init_contract_ -- --nocapture` +Expected: FAIL because the current models only expose `snake_case` fields and have no `capabilities` object. + +- [ ] **Step 3: Implement the minimal model changes** + +```rust +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ResumableUploadInitRequest { + pub sha256: String, + pub size: u64, + #[serde(alias = "content_type")] + pub content_type: String, + #[serde(alias = "file_name")] + pub file_name: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ResumableCapabilities { + pub resume: bool, + pub query_offset: bool, +} +``` + +- [ ] **Step 4: Run the targeted Rust tests to verify they pass** + +Run: `cargo test init_contract_ -- --nocapture` +Expected: PASS + +- [ ] **Step 5: Commit the contract model changes** + +```bash +git add src/resumable.rs +git commit -m "feat: align resumable init payloads with divine contract" +``` + +### Task 2: Emit RFC 3339 expiries for init and session status + +**Files:** +- Modify: `src/resumable.rs` +- Test: `src/resumable.rs` + +- [ ] **Step 1: Write the failing expiry-format tests** + +```rust +#[test] +fn rfc3339_expiry_helper_formats_contract_timestamp() { + let formatted = format_epoch_secs_as_rfc3339(1_774_660_000); + assert_eq!(formatted, "2026-03-28T00:40:00Z"); +} + +#[tokio::test] +async fn rfc3339_expiry_is_used_in_session_status() { + let manager = manager(); + let response = manager + .init_session( + "owner_pubkey", + ResumableUploadInitRequest { + sha256: "5b48aa1fcf30af61243ac9307eb98b7fa22df1c58573c3ca5d1b14fc30099929" + .to_string(), + size: 1024 * 1024, + content_type: "video/mp4".to_string(), + file_name: None, + }, + ) + .await + .expect("init response"); + let auth = response.required_headers.get("Authorization").unwrap().to_string(); + let head = manager + .head_session(&response.upload_id, Some(&auth)) + .await + .expect("head session"); + + assert!(response.expires_at.ends_with('Z')); + assert!(head.expires_at.ends_with('Z')); +} +``` + +- [ ] **Step 2: Run the targeted Rust tests to verify they fail** + +Run: `cargo test rfc3339_expiry_ -- --nocapture` +Expected: FAIL because the current code emits epoch-second strings. + +- [ ] **Step 3: Implement the minimal formatting helper and response changes** + +```rust +fn format_epoch_secs_as_rfc3339(epoch_secs: u64) -> String { + time::OffsetDateTime::from_unix_timestamp(epoch_secs as i64) + .expect("valid epoch seconds") + .format(&time::format_description::well_known::Rfc3339) + .expect("rfc3339 timestamp") +} +``` + +- [ ] **Step 4: Run the targeted Rust tests to verify they pass** + +Run: `cargo test rfc3339_expiry_ -- --nocapture` +Expected: PASS + +- [ ] **Step 5: Commit the expiry-format changes** + +```bash +git add src/resumable.rs Cargo.toml Cargo.lock +git commit -m "feat: emit rfc3339 resumable expiry timestamps" +``` + +## Chunk 2: Session Headers And Completion Response + +### Task 3: Add the approved session expiry header to session `HEAD` and chunk `PUT` responses + +**Files:** +- Modify: `src/resumable.rs` +- Modify: `src/main.rs` +- Test: `src/main.rs` + +- [ ] **Step 1: Write the failing header test** + +```rust +#[test] +fn session_responses_include_upload_expires_at_header() { + let response = build_session_status_response(UploadSessionStatus { + next_offset: 0, + declared_size: 1024, + expires_at: "2026-03-28T00:40:00Z".to_string(), + chunk_size: 8 * 1024 * 1024, + }); + + assert_eq!( + response.headers().get("Upload-Expires-At").unwrap(), + "2026-03-28T00:40:00Z" + ); +} +``` + +- [ ] **Step 2: Run the targeted Rust test to verify it fails** + +Run: `cargo test session_responses_include_upload_expires_at_header -- --nocapture` +Expected: FAIL because the current responses only emit `Upload-Expires`. + +- [ ] **Step 3: Implement the minimal header helper and wire both handlers through it** + +```rust +const SESSION_EXPIRES_AT_HEADER: &str = "Upload-Expires-At"; +``` + +- [ ] **Step 4: Run the targeted Rust test to verify it passes** + +Run: `cargo test session_responses_include_upload_expires_at_header -- --nocapture` +Expected: PASS + +- [ ] **Step 5: Commit the session-header changes** + +```bash +git add src/main.rs src/resumable.rs +git commit -m "feat: advertise upload session expiry with contract header" +``` + +### Task 4: Return a mobile-compatible completion descriptor from resumable `complete` + +**Files:** +- Modify: `src/resumable.rs` +- Modify: `src/main.rs` +- Test: `src/resumable.rs` + +- [ ] **Step 1: Write the failing completion-shape test** + +```rust +#[test] +fn complete_response_serializes_public_descriptor_fields() { + let response = CompleteUploadResponse { + url: "https://media.divine.video/abc".to_string(), + fallback_url: Some("https://media.divine.video/abc".to_string()), + thumbnail: Some("https://media.divine.video/abc.jpg".to_string()), + streaming: Some(StreamingInfo { + hls_url: None, + mp4_url: None, + thumbnail_url: Some("https://media.divine.video/abc.jpg".to_string()), + status: Some("processing".to_string()), + }), + }; + + let json = serde_json::to_value(response).expect("serialize complete response"); + + assert_eq!(json.get("url").unwrap(), "https://media.divine.video/abc"); + assert_eq!(json.get("fallbackUrl").unwrap(), "https://media.divine.video/abc"); + assert!(json.get("streaming").is_some()); +} +``` + +- [ ] **Step 2: Run the targeted Rust test to verify it fails** + +Run: `cargo test complete_response_serializes_public_descriptor_fields -- --nocapture` +Expected: FAIL because the current completion response still exposes internal blob metadata fields. + +- [ ] **Step 3: Implement the minimal completion response rewrite** + +```rust +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct StreamingInfo { + pub hls_url: Option, + pub mp4_url: Option, + pub thumbnail_url: Option, + pub status: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CompleteUploadResponse { + pub url: String, + pub fallback_url: Option, + pub thumbnail: Option, + pub streaming: Option, +} +``` + +- [ ] **Step 4: Run the targeted Rust test to verify it passes** + +Run: `cargo test complete_response_serializes_public_descriptor_fields -- --nocapture` +Expected: PASS + +- [ ] **Step 5: Commit the completion response changes** + +```bash +git add src/resumable.rs src/main.rs +git commit -m "feat: return public completion descriptor for resumable uploads" +``` + +## Chunk 3: Docs, Landing Page, And Full Verification + +### Task 5: Update docs so the upload host no longer claims the public control-plane role + +**Files:** +- Modify: `README.md` +- Modify: `src/landing.html` + +- [ ] **Step 1: Write the wording changes** + +```text +Control plane: media.divine.video +Data plane: upload.divine.video +Opaque session URLs are server-issued and resumable-session specific. +``` + +- [ ] **Step 2: Review the rendered copy in the edited files** + +Run: `sed -n '1,220p' README.md && sed -n '90,150p' src/landing.html` +Expected: Both files clearly describe `divine-blossom` as the public control plane and `divine-upload-server` as the opaque session data plane. + +- [ ] **Step 3: Commit the doc updates** + +```bash +git add README.md src/landing.html +git commit -m "docs: clarify divine upload control-plane boundary" +``` + +### Task 6: Run the full verification suite + +**Files:** +- Modify: none + +- [ ] **Step 1: Check formatting** + +Run: `cargo fmt --all -- --check` +Expected: PASS + +- [ ] **Step 2: Run clippy** + +Run: `cargo clippy --all-targets -- -D warnings` +Expected: PASS + +- [ ] **Step 3: Run Rust tests** + +Run: `cargo test --all` +Expected: PASS + +- [ ] **Step 4: Run Python tests** + +Run: `python3 -m unittest discover -s tests -p 'test_*.py'` +Expected: PASS + +- [ ] **Step 5: Commit any final fixups if verification required them** + +```bash +git add src/resumable.rs src/main.rs README.md src/landing.html Cargo.toml Cargo.lock +git commit -m "chore: finalize resumable contract alignment" +``` diff --git a/docs/superpowers/specs/2026-03-28-divine-upload-server-resumable-contract-alignment-design.md b/docs/superpowers/specs/2026-03-28-divine-upload-server-resumable-contract-alignment-design.md new file mode 100644 index 0000000..11ad69f --- /dev/null +++ b/docs/superpowers/specs/2026-03-28-divine-upload-server-resumable-contract-alignment-design.md @@ -0,0 +1,194 @@ +Status: Approved + +# Divine Upload Server Resumable Contract Alignment Design + +**Problem** + +`divine-upload-server` is live on `upload.divine.video`, but the public resumable upload contract approved for the mobile client is defined around a control plane on `media.divine.video` and an opaque data plane on `upload.divine.video`. Today the live services do not line up with that contract: `divine-blossom` is not advertising or serving the resumable control-plane routes, and this repo still exposes several draft-era data-plane mismatches in JSON shape, expiry format, and completion response shape. + +**Goals** + +- Keep `media.divine.video` as the canonical client-facing Blossom control plane. +- Keep `upload.divine.video` as the opaque resumable session data plane. +- Make this repo's resumable payloads and headers line up with the approved Divine resumable session draft closely enough that `divine-blossom` can proxy them without lossy translation. +- Preserve legacy single-shot `PUT /upload` behavior. +- Keep session uploads non-public and only publish the final verified blob. + +**Non-Goals** + +- Move `HEAD /upload`, `POST /upload/init`, or `POST /upload/{uploadId}/complete` ownership from `divine-blossom` to this repo's public responsibility. +- Teach the mobile client a tus-style or upload-host-specific protocol. +- Re-architect the existing single-shot upload path in this pass. +- Solve the entire cross-repo rollout inside this repository alone. + +**Approved Direction** + +We will keep the draft Divine resumable contract as the source of truth. + +- `divine-blossom` remains responsible for: + - `HEAD /upload` capability discovery on `media.divine.video` + - client-facing `POST /upload/init` + - client-facing `POST /upload/{uploadId}/complete` + - client-facing `DELETE /upload/{uploadId}` +- `divine-upload-server` remains responsible for: + - creating and tracking resumable upload sessions + - accepting `PUT` and `HEAD` on opaque session URLs under `upload.divine.video` + - finalizing verified uploads into canonical storage + - returning completion metadata that `divine-blossom` can proxy with minimal or no shape translation + +**Why This Direction** + +Option 1 is the only direction that matches the approved mobile spec in `divine-mobile` and preserves the intended Blossom boundary. Treating the currently live upload host as the protocol source of truth would force a mobile protocol fork, invalidate the March 26 design and plan, and couple the client to an unfinished backend shape rather than to the intended Divine contract. + +**Current State** + +- `README.md` already documents this repo as the data plane behind `upload.divine.video`, with `divine-blossom` owning `HEAD /upload` and the control-plane proxy role. +- `src/main.rs` already implements: + - `POST /upload/init` + - `POST /upload/:upload_id/complete` + - `DELETE /upload/:upload_id` + - `PUT /sessions/:upload_id` + - `HEAD /sessions/:upload_id` +- Live probes on March 28, 2026 show: + - `HEAD https://media.divine.video/upload` returns `200` without Divine capability headers + - `POST https://media.divine.video/upload/init` returns `404` + - `HEAD https://upload.divine.video/upload` returns `405` with `Allow: PUT,OPTIONS` + +**Observed Gaps In This Repo** + +1. Request and response JSON are still Rust-style `snake_case`, while the approved contract is `camelCase`. +2. Expiry values are emitted as raw epoch-second strings, while the approved contract uses RFC 3339 timestamps. +3. Session progress headers currently use `Upload-Expires`; the approved contract calls for `Upload-Expires-At`. +4. `POST /upload/{uploadId}/complete` returns internal blob metadata, not the public-facing Blossom descriptor shape expected by the mobile upload service. +5. The landing page overstates public endpoint ownership and does not explain the control-plane/data-plane split clearly enough. + +**Design** + +## 1. Keep The Repo Boundary Intact + +This repo will not become the public control plane. The public client contract still lives on `media.divine.video`, served by `divine-blossom`. + +Inside that split, this repo should make its resumable handler contract proxy-friendly: + +- accept the same field names the public control plane expects +- return the same field names the public control plane wants to forward +- expose the same session progress headers the mobile client is built against + +That reduces duplicated translation logic and lowers the risk of drift between repos. + +## 2. Align Init Payloads With The Draft Contract + +`ResumableUploadInitRequest` and `ResumableUploadInitResponse` in `src/resumable.rs` should become `camelCase` at the JSON boundary: + +- request: + - `sha256` + - `size` + - `contentType` + - `fileName` +- response: + - `uploadId` + - `uploadUrl` + - `expiresAt` + - `chunkSize` + - `nextOffset` + - `requiredHeaders` + - `capabilities` + +Compatibility detail: + +- accept legacy `content_type` and `file_name` as aliases during the transition so existing internal callers do not break accidentally +- emit only the approved `camelCase` names in responses + +## 3. Emit RFC 3339 Expiry Values + +The approved contract examples use timestamps such as `2026-03-26T15:00:00Z`, not epoch strings. + +This repo should therefore: + +- return `expiresAt` as RFC 3339 in init responses +- return `Upload-Expires-At` as RFC 3339 in session `HEAD` and chunk `PUT` responses + +For rollout safety, the data plane may also keep emitting `Upload-Expires` temporarily as a compatibility header if doing so is cheap, but the approved contract header must be present and tested. + +## 4. Return A Public Descriptor Shape From Complete + +The completion handler should return a response shaped for the existing mobile parser: + +- top-level `url` +- top-level `fallbackUrl` +- optional top-level `thumbnail` +- optional `streaming` object with: + - `hlsUrl` + - `mp4Url` + - `thumbnailUrl` or `thumbnail` + - `status` + +For this repo's first alignment pass: + +- `fallbackUrl` should point at the canonical media URL on `media.divine.video` +- `url` should also be set so existing clients always have a primary URL +- `streaming.status` may be `"processing"` when transcoding has been triggered but stream assets are not yet known +- `thumbnail` should reuse the existing thumbnail generation logic when available +- `dim` and other internal metadata can remain available internally, but the proxied completion contract should prioritize the public descriptor shape + +This keeps the response consumable by the current mobile upload service without forcing `divine-blossom` to manufacture fields it does not own. + +## 5. Clarify Docs And Operator Surfaces + +`README.md` and `src/landing.html` should be updated so they stop implying that clients discover resumable capability on the upload host. + +They should state clearly: + +- `media.divine.video` is the control plane +- `upload.divine.video` is the opaque session data plane +- session URLs are for server-issued resumable uploads only + +That makes the live service behavior easier to reason about during rollout and reduces confusion from probe-based debugging. + +**File Boundaries** + +- `src/resumable.rs` + - request/response schema alignment + - expiry formatting helpers + - completion response contract shape + - unit tests for serialization and completion metadata +- `src/main.rs` + - resumable response headers + - handler wiring if response structs change + - route-level tests where needed +- `README.md` + - public ownership clarification +- `src/landing.html` + - endpoint and control-plane/data-plane wording cleanup + +**Verification** + +- Rust unit tests in `src/resumable.rs` for: + - camelCase request parsing + - camelCase init response serialization + - RFC 3339 expiry formatting + - completion response shape +- Rust tests in `src/main.rs` or extracted helpers for: + - `Upload-Expires-At` presence on session responses + - compatibility header behavior if retained +- Full repo verification: + - `cargo fmt --all -- --check` + - `cargo clippy --all-targets -- -D warnings` + - `cargo test --all` + - `python3 -m unittest discover -s tests -p 'test_*.py'` + +**Cross-Repo Dependency** + +This repository alone cannot satisfy the mobile resumable contract. + +Separate `divine-blossom` work is still required to: + +- advertise capability headers on `HEAD /upload` +- expose public `init`, `complete`, and abort routes on `media.divine.video` +- proxy those routes to this service without distorting the approved contract + +That work should be tracked in a separate spec and implementation plan in the `divine-blossom` repository. + +**Recommendation** + +Align this repo to the approved Divine resumable draft now, then land the matching `divine-blossom` control-plane proxy work. That preserves the March 26 mobile contract and turns the current production gap into a deployment-alignment problem instead of a client-protocol rewrite. diff --git a/src/landing.html b/src/landing.html index 7e7534e..7e5f49e 100644 --- a/src/landing.html +++ b/src/landing.html @@ -93,9 +93,8 @@

upload.divine.video

Blossom media upload server for divine.video
- This is the media upload endpoint for the divine.video platform. - It implements the Blossom protocol for - content-addressed blob storage, with support for resumable uploads, automatic thumbnail extraction, and video transcoding. + This is the resumable upload data plane for the divine.video platform. + The public Blossom control plane remains on media.divine.video; this host accepts server-issued session URLs and final blob uploads behind that control plane.
@@ -108,7 +107,7 @@

API Endpoints

POST /upload/init -
Initialize a resumable upload session
+
Internal/proxied resumable session initialization
PUT @@ -123,12 +122,12 @@

API Endpoints

POST /upload/:id/complete -
Finalize a resumable upload
+
Internal/proxied resumable upload finalization
DELETE /upload/:id -
Abort a resumable upload
+
Abort a server-issued resumable upload session
diff --git a/src/main.rs b/src/main.rs index e1d287d..034d16f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -41,6 +41,8 @@ use tower::Service; use tower_http::cors::{Any, CorsLayer}; use tracing::{error, info, warn}; +const DEFAULT_UPLOAD_ROUTE_MAX_BODY_SIZE: u64 = 1024 * 1024; + // Configuration #[derive(Clone)] struct Config { @@ -79,14 +81,40 @@ impl Config { .ok() .and_then(|value| value.parse().ok()) .unwrap_or(resumable::DEFAULT_RESUMABLE_SESSION_TTL_SECS), - resumable_chunk_size: env::var("RESUMABLE_CHUNK_SIZE") - .ok() - .and_then(|value| value.parse().ok()) - .unwrap_or(resumable::DEFAULT_RESUMABLE_CHUNK_SIZE), + resumable_chunk_size: resolve_resumable_chunk_size( + env::var("RESUMABLE_CHUNK_SIZE") + .ok() + .and_then(|value| value.parse().ok()) + .filter(|value: &u64| *value > 0) + .unwrap_or(resumable::DEFAULT_RESUMABLE_CHUNK_SIZE), + load_resumable_max_request_body_size(), + ), } } } +fn load_resumable_max_request_body_size() -> u64 { + [ + "RESUMABLE_MAX_REQUEST_BODY_SIZE", + "UPLOAD_ROUTE_MAX_BODY_SIZE", + ] + .into_iter() + .find_map(|name| { + env::var(name) + .ok() + .and_then(|value| value.parse().ok()) + .filter(|value: &u64| *value > 0) + }) + .unwrap_or(DEFAULT_UPLOAD_ROUTE_MAX_BODY_SIZE) +} + +fn resolve_resumable_chunk_size( + configured_chunk_size: u64, + upload_route_max_body_size: u64, +) -> u64 { + configured_chunk_size.min(upload_route_max_body_size) +} + // App state shared across handlers struct AppState { gcs_client: GcsClient, @@ -148,6 +176,16 @@ struct ErrorResponse { const BLOSSOM_AUTH_KIND: u32 = 24242; +fn cors_exposed_upload_headers() -> Vec { + vec![ + HeaderName::from_static("upload-offset"), + HeaderName::from_static("upload-length"), + HeaderName::from_static("upload-expires"), + HeaderName::from_static("upload-expires-at"), + HeaderName::from_static("x-divine-chunk-size"), + ] +} + #[tokio::main] async fn main() -> Result<()> { // Initialize tracing @@ -183,12 +221,7 @@ async fn main() -> Result<()> { header::CONTENT_TYPE, header::CONTENT_RANGE, ]) - .expose_headers([ - HeaderName::from_static("upload-offset"), - HeaderName::from_static("upload-length"), - HeaderName::from_static("upload-expires"), - HeaderName::from_static("x-divine-chunk-size"), - ]) + .expose_headers(cors_exposed_upload_headers()) .max_age(std::time::Duration::from_secs(86400)); // Build router @@ -295,6 +328,32 @@ fn header_value(value: u64) -> HeaderValue { HeaderValue::from_str(&value.to_string()).expect("numeric header values must be valid") } +fn build_session_status_response(status: resumable::UploadSessionStatus) -> Response { + let mut response = Response::new(Body::empty()); + *response.status_mut() = StatusCode::NO_CONTENT; + response.headers_mut().insert( + resumable::SESSION_OFFSET_HEADER, + header_value(status.next_offset), + ); + response.headers_mut().insert( + resumable::SESSION_LENGTH_HEADER, + header_value(status.declared_size), + ); + let expires_at = HeaderValue::from_str(&status.expires_at) + .expect("session expiry header must be valid ASCII"); + response + .headers_mut() + .insert(resumable::SESSION_EXPIRES_AT_HEADER, expires_at.clone()); + response + .headers_mut() + .insert(resumable::SESSION_EXPIRES_HEADER, expires_at); + response.headers_mut().insert( + resumable::SESSION_CHUNK_SIZE_HEADER, + header_value(status.chunk_size), + ); + response +} + /// POST /audit - Receive audit log entries from Fastly edge and write as structured logs. /// Google Cloud container logging: JSON on stdout is auto-ingested by Cloud Logging. /// This gives us: queryable logs, retention policies, export to BigQuery, alerting. @@ -354,6 +413,7 @@ fn resumable_manager( ), resumable::GcsSessionStore::new(state.gcs_client.clone(), state.config.gcs_bucket.clone()), state.config.upload_base_url.clone(), + state.config.cdn_base_url.clone(), state.config.resumable_chunk_size, state.config.resumable_session_ttl_secs, ) @@ -399,28 +459,7 @@ async fn handle_session_head( ) .await { - Ok(status) => { - let mut response = Response::new(Body::empty()); - *response.status_mut() = StatusCode::NO_CONTENT; - response.headers_mut().insert( - resumable::SESSION_OFFSET_HEADER, - header_value(status.next_offset), - ); - response.headers_mut().insert( - resumable::SESSION_LENGTH_HEADER, - header_value(status.declared_size), - ); - response.headers_mut().insert( - resumable::SESSION_EXPIRES_HEADER, - HeaderValue::from_str(&status.expires_at) - .expect("session expiry header must be valid ASCII"), - ); - response.headers_mut().insert( - resumable::SESSION_CHUNK_SIZE_HEADER, - header_value(status.chunk_size), - ); - response - } + Ok(status) => build_session_status_response(status), Err(error) => resumable_error_response(error), } } @@ -465,28 +504,7 @@ async fn handle_session_chunk( ) .await { - Ok(status) => { - let mut response = Response::new(Body::empty()); - *response.status_mut() = StatusCode::NO_CONTENT; - response.headers_mut().insert( - resumable::SESSION_OFFSET_HEADER, - header_value(status.next_offset), - ); - response.headers_mut().insert( - resumable::SESSION_LENGTH_HEADER, - header_value(status.declared_size), - ); - response.headers_mut().insert( - resumable::SESSION_EXPIRES_HEADER, - HeaderValue::from_str(&status.expires_at) - .expect("session expiry header must be valid ASCII"), - ); - response.headers_mut().insert( - resumable::SESSION_CHUNK_SIZE_HEADER, - header_value(status.chunk_size), - ); - response - } + Ok(status) => build_session_status_response(status), Err(error) => resumable_error_response(error), } } @@ -1118,7 +1136,11 @@ async fn probe_video_dimensions(video_bytes: &[u8]) -> Result { #[cfg(test)] #[allow(clippy::items_after_test_module)] mod tests { - use super::{media_source_candidates, new_temp_media_path}; + use super::{ + build_session_status_response, cors_exposed_upload_headers, media_source_candidates, + new_temp_media_path, resolve_resumable_chunk_size, + }; + use crate::resumable; #[test] fn temp_media_paths_are_unique_per_request() { @@ -1141,6 +1163,47 @@ mod tests { assert_eq!(candidates[1], format!("{}/hls/stream_720p.ts", hash)); assert_eq!(candidates[2], format!("{}/hls/stream_480p.ts", hash)); } + + #[test] + fn session_responses_include_upload_expires_at_header() { + let response = build_session_status_response(resumable::UploadSessionStatus { + next_offset: 0, + declared_size: 1024, + expires_at: "2026-03-28T00:40:00Z".to_string(), + chunk_size: 8 * 1024 * 1024, + }); + + assert_eq!( + response + .headers() + .get("Upload-Expires-At") + .expect("contract expiry header"), + "2026-03-28T00:40:00Z" + ); + } + + #[test] + fn cors_exposes_upload_expires_at_header() { + assert!(cors_exposed_upload_headers() + .iter() + .any(|header| header.as_str() == "upload-expires-at")); + } + + #[test] + fn advertised_chunk_size_is_capped_to_upload_route_body_limit() { + assert_eq!( + resolve_resumable_chunk_size(8 * 1024 * 1024, 1024 * 1024), + 1024 * 1024 + ); + } + + #[test] + fn advertised_chunk_size_keeps_smaller_configured_value() { + assert_eq!( + resolve_resumable_chunk_size(512 * 1024, 1024 * 1024), + 512 * 1024 + ); + } } fn validate_auth(headers: &axum::http::HeaderMap, required_action: &str) -> Result { diff --git a/src/resumable.rs b/src/resumable.rs index f633df9..3adfdee 100644 --- a/src/resumable.rs +++ b/src/resumable.rs @@ -23,24 +23,30 @@ use std::{ collections::HashMap, time::{SystemTime, UNIX_EPOCH}, }; +use time::{format_description::well_known::Rfc3339, OffsetDateTime}; pub const DEFAULT_RESUMABLE_CHUNK_SIZE: u64 = 8 * 1024 * 1024; pub const DEFAULT_RESUMABLE_SESSION_TTL_SECS: u64 = 24 * 60 * 60; pub const SESSION_OFFSET_HEADER: &str = "Upload-Offset"; pub const SESSION_LENGTH_HEADER: &str = "Upload-Length"; pub const SESSION_EXPIRES_HEADER: &str = "Upload-Expires"; +pub const SESSION_EXPIRES_AT_HEADER: &str = "Upload-Expires-At"; pub const SESSION_CHUNK_SIZE_HEADER: &str = "X-Divine-Chunk-Size"; #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct ResumableUploadInitRequest { pub sha256: String, pub size: u64, + #[serde(alias = "content_type")] pub content_type: String, + #[serde(alias = "file_name")] #[serde(skip_serializing_if = "Option::is_none")] pub file_name: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct ResumableUploadInitResponse { pub upload_id: String, pub upload_url: String, @@ -49,17 +55,40 @@ pub struct ResumableUploadInitResponse { pub next_offset: u64, #[serde(default)] pub required_headers: HashMap, + #[serde(default)] + pub capabilities: ResumableCapabilities, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct ResumableCapabilities { + pub resume: bool, + pub query_offset: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CompleteUploadResponse { - pub sha256: String, - pub size: u64, - pub content_type: String, +#[serde(rename_all = "camelCase")] +pub struct StreamingInfo { + #[serde(skip_serializing_if = "Option::is_none")] + pub hls_url: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub mp4_url: Option, #[serde(skip_serializing_if = "Option::is_none")] pub thumbnail_url: Option, #[serde(skip_serializing_if = "Option::is_none")] - pub dim: Option, + pub status: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CompleteUploadResponse { + pub url: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub fallback_url: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub thumbnail: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub streaming: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -193,6 +222,7 @@ pub struct ResumableManager { backend: B, store: S, upload_base_url: String, + cdn_base_url: String, chunk_size: u64, session_ttl_secs: u64, } @@ -206,6 +236,7 @@ where backend: B, store: S, upload_base_url: impl Into, + cdn_base_url: impl Into, chunk_size: u64, session_ttl_secs: u64, ) -> Self { @@ -213,6 +244,7 @@ where backend, store, upload_base_url: upload_base_url.into().trim_end_matches('/').to_string(), + cdn_base_url: cdn_base_url.into().trim_end_matches('/').to_string(), chunk_size, session_ttl_secs, } @@ -262,10 +294,14 @@ where Ok(ResumableUploadInitResponse { upload_id: upload_id.clone(), upload_url: format!("{}/sessions/{}", self.upload_base_url, upload_id), - expires_at: expires_at_epoch_secs.to_string(), + expires_at: format_epoch_secs_as_rfc3339(expires_at_epoch_secs), chunk_size: self.chunk_size, next_offset: 0, required_headers, + capabilities: ResumableCapabilities { + resume: true, + query_offset: true, + }, }) } @@ -279,7 +315,7 @@ where Ok(UploadSessionStatus { next_offset: session.next_offset, declared_size: session.declared_size, - expires_at: session.expires_at_epoch_secs.to_string(), + expires_at: format_epoch_secs_as_rfc3339(session.expires_at_epoch_secs), chunk_size: self.chunk_size, }) } @@ -337,7 +373,7 @@ where Ok(UploadSessionStatus { next_offset: session.next_offset, declared_size: session.declared_size, - expires_at: session.expires_at_epoch_secs.to_string(), + expires_at: format_epoch_secs_as_rfc3339(session.expires_at_epoch_secs), chunk_size: self.chunk_size, }) } @@ -373,13 +409,7 @@ where } if let Some(finalized_object) = session.finalized_object.as_ref() { - return Ok(CompleteUploadResponse { - sha256: finalized_object.clone(), - size: session.declared_size, - content_type: session.content_type.clone(), - thumbnail_url: None, - dim: None, - }); + return Ok(self.build_complete_upload_response(finalized_object, &session.content_type)); } let (computed_hash, computed_size) = self @@ -423,13 +453,7 @@ where session.finalized_object = Some(session.final_sha256.clone()); self.store.save(&session).await.map_err(internal_error)?; - Ok(CompleteUploadResponse { - sha256: session.final_sha256.clone(), - size: session.declared_size, - content_type: session.content_type.clone(), - thumbnail_url: None, - dim: None, - }) + Ok(self.build_complete_upload_response(&session.final_sha256, &session.content_type)) } pub async fn abort_session( @@ -490,6 +514,31 @@ where let _ = self.store.delete(&session.upload_id).await; } + fn build_complete_upload_response( + &self, + sha256: &str, + content_type: &str, + ) -> CompleteUploadResponse { + let canonical_url = format!("{}/{}", self.cdn_base_url, sha256); + let streaming = if is_video_content_type(content_type) { + Some(StreamingInfo { + hls_url: None, + mp4_url: None, + thumbnail_url: None, + status: Some("processing".to_string()), + }) + } else { + None + }; + + CompleteUploadResponse { + url: canonical_url.clone(), + fallback_url: Some(canonical_url), + thumbnail: None, + streaming, + } + } + async fn hash_temp_object(&self, object_key: &str) -> Result<(String, u64)> { let mut stream = self.backend.stream_object(object_key).await?; let mut hasher = Sha256::new(); @@ -833,10 +882,21 @@ fn now_epoch_secs() -> u64 { .as_secs() } +fn format_epoch_secs_as_rfc3339(epoch_secs: u64) -> String { + OffsetDateTime::from_unix_timestamp(epoch_secs as i64) + .expect("epoch seconds should produce a valid timestamp") + .format(&Rfc3339) + .expect("rfc3339 timestamp formatting should succeed") +} + fn parse_bearer_token(authorization: Option<&str>) -> Option<&str> { authorization.and_then(|value| value.strip_prefix("Bearer ")) } +fn is_video_content_type(content_type: &str) -> bool { + content_type.starts_with("video/") +} + fn internal_error(error: anyhow::Error) -> ResumableError { ResumableError::Internal(error.to_string()) } @@ -979,11 +1039,136 @@ mod tests { FakeBackend::default(), InMemoryStore::default(), "https://upload.divine.video", + "https://media.divine.video", DEFAULT_RESUMABLE_CHUNK_SIZE, DEFAULT_RESUMABLE_SESSION_TTL_SECS, ) } + #[test] + fn init_contract_request_accepts_camel_case_fields() { + let payload = serde_json::json!({ + "sha256": "5b48aa1fcf30af61243ac9307eb98b7fa22df1c58573c3ca5d1b14fc30099929", + "size": 12, + "contentType": "video/mp4", + "fileName": "clip.mp4" + }); + + let request: ResumableUploadInitRequest = + serde_json::from_value(payload).expect("camelCase init request should deserialize"); + + assert_eq!(request.content_type, "video/mp4"); + assert_eq!(request.file_name.as_deref(), Some("clip.mp4")); + } + + #[test] + fn init_contract_response_serializes_camel_case_fields() { + let response = ResumableUploadInitResponse { + upload_id: "up_123".to_string(), + upload_url: "https://upload.divine.video/sessions/up_123".to_string(), + expires_at: "2026-03-28T04:00:00Z".to_string(), + chunk_size: 8 * 1024 * 1024, + next_offset: 0, + required_headers: HashMap::new(), + capabilities: ResumableCapabilities::default(), + }; + + let json = serde_json::to_value(response).expect("serialize response"); + + assert!(json.get("uploadId").is_some()); + assert!(json.get("uploadUrl").is_some()); + assert!(json.get("expiresAt").is_some()); + assert!(json.get("chunkSize").is_some()); + assert!(json.get("nextOffset").is_some()); + assert!(json.get("upload_id").is_none()); + } + + #[tokio::test] + async fn init_contract_response_includes_capabilities_flags() { + let manager = manager(); + + let response = manager + .init_session( + "owner_pubkey", + ResumableUploadInitRequest { + sha256: "5b48aa1fcf30af61243ac9307eb98b7fa22df1c58573c3ca5d1b14fc30099929" + .to_string(), + size: 1024, + content_type: "video/mp4".to_string(), + file_name: Some("video.mp4".to_string()), + }, + ) + .await + .expect("init response"); + let json = serde_json::to_value(response).expect("serialize init response"); + + assert_eq!( + json.get("capabilities") + .and_then(|value| value.get("resume")) + .and_then(|value| value.as_bool()), + Some(true) + ); + assert_eq!( + json.get("capabilities") + .and_then(|value| value.get("queryOffset")) + .and_then(|value| value.as_bool()), + Some(true) + ); + } + + #[tokio::test] + async fn rfc3339_expiry_is_used_in_init_response() { + let manager = manager(); + + let response = manager + .init_session( + "owner_pubkey", + ResumableUploadInitRequest { + sha256: "5b48aa1fcf30af61243ac9307eb98b7fa22df1c58573c3ca5d1b14fc30099929" + .to_string(), + size: 1024, + content_type: "video/mp4".to_string(), + file_name: Some("video.mp4".to_string()), + }, + ) + .await + .expect("init response"); + + assert!(response.expires_at.contains('T')); + assert!(response.expires_at.ends_with('Z')); + } + + #[tokio::test] + async fn rfc3339_expiry_is_used_in_session_status() { + let manager = manager(); + let response = manager + .init_session( + "owner_pubkey", + ResumableUploadInitRequest { + sha256: "5b48aa1fcf30af61243ac9307eb98b7fa22df1c58573c3ca5d1b14fc30099929" + .to_string(), + size: 1024 * 1024, + content_type: "video/mp4".to_string(), + file_name: None, + }, + ) + .await + .expect("init response"); + let auth = response + .required_headers + .get("Authorization") + .expect("session auth") + .to_string(); + + let head = manager + .head_session(&response.upload_id, Some(&auth)) + .await + .expect("head session"); + + assert!(head.expires_at.contains('T')); + assert!(head.expires_at.ends_with('Z')); + } + #[tokio::test] async fn init_creates_session_and_returns_upload_url() { let manager = manager(); @@ -1057,6 +1242,47 @@ mod tests { assert_eq!(head.declared_size, 1024 * 1024); } + #[tokio::test] + async fn upload_chunk_accepts_configured_max_chunk_size() { + let manager = manager(); + let declared_size = DEFAULT_RESUMABLE_CHUNK_SIZE * 2; + let response = manager + .init_session( + "owner_pubkey", + ResumableUploadInitRequest { + sha256: "5b48aa1fcf30af61243ac9307eb98b7fa22df1c58573c3ca5d1b14fc30099929" + .to_string(), + size: declared_size, + content_type: "video/mp4".to_string(), + file_name: None, + }, + ) + .await + .expect("init response"); + let auth = response + .required_headers + .get("Authorization") + .expect("session auth") + .to_string(); + + let status = manager + .upload_chunk( + &response.upload_id, + Some(&auth), + &format!( + "bytes 0-{}/{}", + DEFAULT_RESUMABLE_CHUNK_SIZE - 1, + declared_size + ), + Bytes::from(vec![3u8; DEFAULT_RESUMABLE_CHUNK_SIZE as usize]), + ) + .await + .expect("max-sized chunk upload"); + + assert_eq!(status.next_offset, DEFAULT_RESUMABLE_CHUNK_SIZE); + assert_eq!(status.chunk_size, DEFAULT_RESUMABLE_CHUNK_SIZE); + } + #[tokio::test] async fn put_session_chunk_rejects_non_contiguous_ranges() { let manager = manager(); @@ -1101,4 +1327,66 @@ mod tests { assert!(matches!(error, ResumableError::RangeNotSatisfiable(_))); } + + #[tokio::test] + async fn complete_session_returns_public_descriptor_fields() { + let manager = manager(); + let response = manager + .init_session( + "owner_pubkey", + ResumableUploadInitRequest { + sha256: "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" + .to_string(), + size: 5, + content_type: "video/mp4".to_string(), + file_name: Some("hello.mp4".to_string()), + }, + ) + .await + .expect("init response"); + let auth = response + .required_headers + .get("Authorization") + .expect("session auth") + .to_string(); + + manager + .upload_chunk( + &response.upload_id, + Some(&auth), + "bytes 0-4/5", + Bytes::from_static(b"hello"), + ) + .await + .expect("final chunk"); + + let complete = manager + .complete_session(&response.upload_id, "owner_pubkey") + .await + .expect("complete session"); + let json = serde_json::to_value(&complete).expect("serialize complete response"); + + assert_eq!( + complete.url, + "https://media.divine.video/2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" + ); + assert_eq!( + complete.fallback_url.as_deref(), + Some( + "https://media.divine.video/2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" + ) + ); + assert_eq!( + json.get("fallbackUrl").and_then(|value| value.as_str()), + Some( + "https://media.divine.video/2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" + ) + ); + assert_eq!( + json.get("streaming") + .and_then(|value| value.get("status")) + .and_then(|value| value.as_str()), + Some("processing") + ); + } }