diff --git a/crates/arco-api/src/routes/orchestration.rs b/crates/arco-api/src/routes/orchestration.rs index 6c11d47..d1b9e8f 100644 --- a/crates/arco-api/src/routes/orchestration.rs +++ b/crates/arco-api/src/routes/orchestration.rs @@ -440,6 +440,18 @@ const MAX_LIST_LIMIT: u32 = 200; const DEFAULT_LIMIT: u32 = 50; const MAX_LOG_BYTES: usize = 2 * 1024 * 1024; +// Input size limits (defense-in-depth; avoid pathological CPU/memory in validation/fingerprinting). +const MAX_RUN_KEY_LEN: usize = 256; +const MAX_PARTITION_KEY_LEN: usize = 1024; +const MAX_SELECTION_ITEMS: usize = 10_000; +const MAX_SELECTION_ITEM_LEN: usize = 256; +const MAX_LABELS: usize = 128; +const MAX_LABEL_KEY_LEN: usize = 128; +const MAX_LABEL_VALUE_LEN: usize = 2048; +const MAX_PARTITIONS: usize = 128; +const MAX_PARTITION_DIM_LEN: usize = 64; +const MAX_PARTITION_VALUE_LEN: usize = 256; + /// Response for listing runs. #[derive(Debug, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] @@ -1933,6 +1945,92 @@ fn reject_reserved_lineage_labels(labels: &HashMap) -> Result<() ))) } +fn validate_selection_limits(selection: &[String]) -> Result<(), ApiError> { + if selection.len() > MAX_SELECTION_ITEMS { + return Err(ApiError::bad_request(format!( + "selection exceeds max items ({MAX_SELECTION_ITEMS})" + ))); + } + for value in selection { + if value.len() > MAX_SELECTION_ITEM_LEN { + return Err(ApiError::bad_request(format!( + "selection item exceeds max length ({MAX_SELECTION_ITEM_LEN})" + ))); + } + } + Ok(()) +} + +fn validate_labels_limits(labels: &HashMap) -> Result<(), ApiError> { + if labels.len() > MAX_LABELS { + return Err(ApiError::bad_request(format!( + "labels exceed max properties ({MAX_LABELS})" + ))); + } + + for (key, value) in labels { + if key.len() > MAX_LABEL_KEY_LEN { + return Err(ApiError::bad_request(format!( + "label key exceeds max length ({MAX_LABEL_KEY_LEN})" + ))); + } + if value.len() > MAX_LABEL_VALUE_LEN { + return Err(ApiError::bad_request(format!( + "label value exceeds max length ({MAX_LABEL_VALUE_LEN})" + ))); + } + } + + Ok(()) +} + +fn validate_partitions_limits(partitions: &[PartitionValue]) -> Result<(), ApiError> { + if partitions.len() > MAX_PARTITIONS { + return Err(ApiError::bad_request(format!( + "partitions exceed max items ({MAX_PARTITIONS})" + ))); + } + + for partition in partitions { + if partition.key.len() > MAX_PARTITION_DIM_LEN { + return Err(ApiError::bad_request(format!( + "partition key exceeds max length ({MAX_PARTITION_DIM_LEN})" + ))); + } + if partition.value.len() > MAX_PARTITION_VALUE_LEN { + return Err(ApiError::bad_request(format!( + "partition value exceeds max length ({MAX_PARTITION_VALUE_LEN})" + ))); + } + } + + Ok(()) +} + +fn validate_trigger_run_request_limits(request: &TriggerRunRequest) -> Result<(), ApiError> { + validate_selection_limits(&request.selection)?; + validate_labels_limits(&request.labels)?; + validate_partitions_limits(&request.partitions)?; + + if let Some(run_key) = request.run_key.as_deref() { + if run_key.len() > MAX_RUN_KEY_LEN { + return Err(ApiError::bad_request(format!( + "runKey exceeds max length ({MAX_RUN_KEY_LEN})" + ))); + } + } + + if let Some(partition_key) = request.partition_key.as_deref() { + if partition_key.len() > MAX_PARTITION_KEY_LEN { + return Err(ApiError::bad_request(format!( + "partitionKey exceeds max length ({MAX_PARTITION_KEY_LEN})" + ))); + } + } + + Ok(()) +} + fn build_task_counts(run: &RunRow, tasks: &[&TaskRow]) -> TaskCounts { let mut pending = 0; let mut queued = 0; @@ -1993,6 +2091,12 @@ fn build_partition_key(partitions: &[PartitionValue]) -> Result, return Ok(None); } + if partitions.len() > MAX_PARTITIONS { + return Err(ApiError::bad_request(format!( + "partitions exceed max items ({MAX_PARTITIONS})" + ))); + } + let mut seen = HashSet::new(); let mut partition_key = arco_core::partition::PartitionKey::new(); @@ -2001,6 +2105,11 @@ fn build_partition_key(partitions: &[PartitionValue]) -> Result, if key.is_empty() { return Err(ApiError::bad_request("partition key cannot be empty")); } + if key.len() > MAX_PARTITION_DIM_LEN { + return Err(ApiError::bad_request(format!( + "partition key exceeds max length ({MAX_PARTITION_DIM_LEN})" + ))); + } if !is_valid_partition_key(key) { return Err(ApiError::bad_request(format!( "invalid partition key: {key}" @@ -2012,13 +2121,26 @@ fn build_partition_key(partitions: &[PartitionValue]) -> Result, ))); } + if partition.value.len() > MAX_PARTITION_VALUE_LEN { + return Err(ApiError::bad_request(format!( + "partition value exceeds max length ({MAX_PARTITION_VALUE_LEN})" + ))); + } + partition_key.insert( key.to_string(), arco_core::partition::ScalarValue::String(partition.value.clone()), ); } - Ok(Some(partition_key.canonical_string())) + let canonical = partition_key.canonical_string(); + if canonical.len() > MAX_PARTITION_KEY_LEN { + return Err(ApiError::bad_request(format!( + "partitionKey exceeds max length ({MAX_PARTITION_KEY_LEN})" + ))); + } + + Ok(Some(canonical)) } fn is_valid_partition_key(key: &str) -> bool { @@ -2595,6 +2717,8 @@ pub(crate) async fn trigger_run( return Err(ApiError::bad_request("selection cannot be empty")); } + validate_trigger_run_request_limits(&request)?; + reject_reserved_lineage_labels(&request.labels)?; // Generate IDs upfront (needed for reservation) @@ -2960,6 +3084,17 @@ pub(crate) async fn rerun_run( Json(request): Json, ) -> Result { ensure_workspace(&ctx, &workspace_id)?; + + validate_labels_limits(&request.labels)?; + validate_selection_limits(&request.selection)?; + if let Some(run_key) = request.run_key.as_deref() { + if run_key.len() > MAX_RUN_KEY_LEN { + return Err(ApiError::bad_request(format!( + "runKey exceeds max length ({MAX_RUN_KEY_LEN})" + ))); + } + } + reject_reserved_lineage_labels(&request.labels)?; let fold_state = load_orchestration_state(&ctx, &state).await?; @@ -3381,6 +3516,8 @@ pub(crate) async fn backfill_run_key( run_key: Some(request.run_key.clone()), labels: request.labels.clone(), }; + + validate_trigger_run_request_limits(&fingerprint_request)?; let request_fingerprint = build_request_fingerprint(&fingerprint_request)?; let backend = state.storage_backend()?; @@ -4885,6 +5022,62 @@ mod tests { assert!(request.partitions.is_empty()); } + #[test] + fn test_trigger_request_limits_rejects_too_long_partition_key() { + let request = TriggerRunRequest { + selection: vec!["analytics/users".to_string()], + include_upstream: false, + include_downstream: false, + partitions: vec![], + partition_key: Some("x".repeat(MAX_PARTITION_KEY_LEN + 1)), + run_key: None, + labels: HashMap::new(), + }; + + let err = validate_trigger_run_request_limits(&request).expect_err("expected error"); + assert_eq!(err.status(), StatusCode::BAD_REQUEST); + assert!(err.message().contains("partitionKey")); + } + + #[test] + fn test_trigger_request_limits_rejects_too_many_selection_items() { + let request = TriggerRunRequest { + selection: vec!["analytics/users".to_string(); MAX_SELECTION_ITEMS + 1], + include_upstream: false, + include_downstream: false, + partitions: vec![], + partition_key: None, + run_key: None, + labels: HashMap::new(), + }; + + let err = validate_trigger_run_request_limits(&request).expect_err("expected error"); + assert_eq!(err.status(), StatusCode::BAD_REQUEST); + assert!(err.message().contains("selection")); + } + + #[test] + fn test_build_partition_key_rejects_canonical_string_too_long() { + let partitions = vec![ + PartitionValue { + key: "a".to_string(), + value: "x".repeat(MAX_PARTITION_VALUE_LEN), + }, + PartitionValue { + key: "b".to_string(), + value: "x".repeat(MAX_PARTITION_VALUE_LEN), + }, + PartitionValue { + key: "c".to_string(), + value: "x".repeat(MAX_PARTITION_VALUE_LEN), + }, + ]; + + let err = build_partition_key(&partitions).expect_err("expected error"); + assert_eq!(err.status(), StatusCode::BAD_REQUEST); + assert!(err.message().contains("partitionKey exceeds max length")); + } + #[test] fn test_resolve_partition_key_rejects_both_partition_key_and_partitions() { let request = TriggerRunRequest { diff --git a/crates/arco-flow/src/orchestration/run_key.rs b/crates/arco-flow/src/orchestration/run_key.rs index 09c3cad..6de9cb7 100644 --- a/crates/arco-flow/src/orchestration/run_key.rs +++ b/crates/arco-flow/src/orchestration/run_key.rs @@ -26,6 +26,7 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; +use std::time::Duration; use arco_core::{ScopedStorage, WritePrecondition, WriteResult}; @@ -166,8 +167,30 @@ pub async fn reserve_run_key( Ok(ReservationResult::Reserved) } WriteResult::PreconditionFailed { .. } => { - // Read existing reservation - let existing = get_reservation(storage, &reservation.run_key).await?; + // Read existing reservation. + // + // Some object stores can exhibit a brief read-after-write lag in rare cases. + // If the write precondition fails (meaning *some* object exists) but the + // subsequent read returns NotFound, treat it as a retryable race and + // retry the read a handful of times with tight backoff. + let mut existing = get_reservation(storage, &reservation.run_key).await?; + if existing.is_none() { + // Total worst-case sleep ~= 155ms + const READ_RETRIES_MS: [u64; 5] = [5, 10, 20, 40, 80]; + for (attempt, delay_ms) in READ_RETRIES_MS.into_iter().enumerate() { + tracing::debug!( + run_key = %reservation.run_key, + attempt = attempt + 1, + delay_ms, + "run_key reservation not yet visible after precondition failure; retrying read" + ); + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + existing = get_reservation(storage, &reservation.run_key).await?; + if existing.is_some() { + break; + } + } + } match existing { Some(existing) => { // Validate fingerprint consistency with optional cutoff for legacy reservations. @@ -206,7 +229,7 @@ pub async fn reserve_run_key( } None => { // Extremely unlikely: precondition failed but read returned None. - // Could be eventual consistency or deletion race. Retry at caller. + // Could be a deeper storage inconsistency or deletion race. Err(Error::storage(format!( "run_key reservation race: precondition failed but reservation not found: {}", reservation.run_key @@ -263,9 +286,15 @@ pub async fn get_reservation( #[cfg(test)] mod tests { use super::*; - use arco_core::MemoryBackend; - use chrono::Duration; + use arco_core::{ + Error as CoreError, MemoryBackend, ObjectMeta, StorageBackend, WritePrecondition, + }; + use async_trait::async_trait; + use chrono::Duration as ChronoDuration; + use std::ops::Range; use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; fn make_reservation(run_key: &str) -> RunKeyReservation { RunKeyReservation { @@ -355,6 +384,120 @@ mod tests { Ok(()) } + #[derive(Debug)] + struct FlakyReadBackend { + inner: Arc, + flaky_suffix: String, + remaining_misses: AtomicUsize, + } + + impl FlakyReadBackend { + fn new(inner: Arc, flaky_suffix: String, misses: usize) -> Self { + Self { + inner, + flaky_suffix, + remaining_misses: AtomicUsize::new(misses), + } + } + } + + #[async_trait] + impl StorageBackend for FlakyReadBackend { + async fn get(&self, path: &str) -> arco_core::Result { + if path.ends_with(&self.flaky_suffix) { + let mut current = self.remaining_misses.load(Ordering::Relaxed); + while current > 0 { + match self.remaining_misses.compare_exchange_weak( + current, + current - 1, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => return Err(CoreError::NotFound(path.to_string())), + Err(next) => current = next, + } + } + } + self.inner.get(path).await + } + + async fn get_range(&self, path: &str, range: Range) -> arco_core::Result { + self.inner.get_range(path, range).await + } + + async fn put( + &self, + path: &str, + data: Bytes, + precondition: WritePrecondition, + ) -> arco_core::Result { + self.inner.put(path, data, precondition).await + } + + async fn delete(&self, path: &str) -> arco_core::Result<()> { + self.inner.delete(path).await + } + + async fn list(&self, prefix: &str) -> arco_core::Result> { + self.inner.list(prefix).await + } + + async fn head(&self, path: &str) -> arco_core::Result> { + self.inner.head(path).await + } + + async fn signed_url(&self, path: &str, expiry: Duration) -> arco_core::Result { + self.inner.signed_url(path, expiry).await + } + } + + #[tokio::test] + async fn test_reserve_run_key_precondition_failed_read_retry() -> Result<()> { + let inner = Arc::new(MemoryBackend::new()); + let backend: Arc = Arc::new(FlakyReadBackend::new( + inner, + reservation_path("test-run-key"), + 2, + )); + let storage = ScopedStorage::new(backend, "tenant", "workspace")?; + + let reservation1 = make_reservation("test-run-key"); + let result1 = + reserve_run_key(&storage, &reservation1, FingerprintPolicy::lenient()).await?; + assert!(matches!(result1, ReservationResult::Reserved)); + + let reservation2 = make_reservation("test-run-key"); + let result2 = + reserve_run_key(&storage, &reservation2, FingerprintPolicy::lenient()).await?; + assert!(matches!(result2, ReservationResult::AlreadyExists(_))); + + Ok(()) + } + + #[tokio::test] + async fn test_reserve_run_key_precondition_failed_read_retry_is_bounded() -> Result<()> { + let inner = Arc::new(MemoryBackend::new()); + let backend: Arc = Arc::new(FlakyReadBackend::new( + inner, + reservation_path("test-run-key"), + 100, + )); + let storage = ScopedStorage::new(backend, "tenant", "workspace")?; + + let reservation1 = make_reservation("test-run-key"); + let result1 = + reserve_run_key(&storage, &reservation1, FingerprintPolicy::lenient()).await?; + assert!(matches!(result1, ReservationResult::Reserved)); + + let reservation2 = make_reservation("test-run-key"); + let err = reserve_run_key(&storage, &reservation2, FingerprintPolicy::lenient()) + .await + .expect_err("expected error"); + assert!(err.to_string().contains("precondition failed")); + + Ok(()) + } + #[tokio::test] async fn test_get_reservation_not_found() -> Result<()> { let backend = Arc::new(MemoryBackend::new()); @@ -499,7 +642,7 @@ mod tests { // First reservation with no fingerprint (simulates old reservation) let mut reservation1 = make_reservation("test-run-key"); reservation1.request_fingerprint = None; - reservation1.created_at = cutoff - Duration::hours(1); + reservation1.created_at = cutoff - ChronoDuration::hours(1); let result1 = reserve_run_key(&storage, &reservation1, policy).await?; assert!(matches!(result1, ReservationResult::Reserved)); @@ -526,7 +669,7 @@ mod tests { // First reservation with no fingerprint created AFTER cutoff let mut reservation1 = make_reservation("test-run-key"); reservation1.request_fingerprint = None; - reservation1.created_at = cutoff + Duration::hours(1); + reservation1.created_at = cutoff + ChronoDuration::hours(1); let result1 = reserve_run_key(&storage, &reservation1, policy).await?; assert!(matches!(result1, ReservationResult::Reserved)); diff --git a/crates/arco-flow/tests/property_tests.rs b/crates/arco-flow/tests/property_tests.rs index 9f498ed..1270f64 100644 --- a/crates/arco-flow/tests/property_tests.rs +++ b/crates/arco-flow/tests/property_tests.rs @@ -303,6 +303,12 @@ proptest! { Err(arco_flow::error::Error::DependencyNotFound { .. }) => { // Valid rejection - missing deps correctly detected } + Err(arco_flow::error::Error::PlanGenerationFailed { message }) + if message.contains("duplicate TaskKey") || message.contains("duplicate task_id") => + { + // Valid rejection - input contains duplicate semantic tasks or duplicate IDs. + // (These inputs are possible in the proptest generator and should be rejected.) + } Err(e) => { prop_assert!(false, "Unexpected error: {e:?}"); }