diff --git a/backend/migrations/20260627000000_add_content_integrity_anchors.sql b/backend/migrations/20260627000000_add_content_integrity_anchors.sql new file mode 100644 index 00000000..fb6120eb --- /dev/null +++ b/backend/migrations/20260627000000_add_content_integrity_anchors.sql @@ -0,0 +1,39 @@ +-- Decentralized content integrity anchors (IPFS / Arweave CAS registry) +-- Files are stored directly on decentralized networks; this table tracks +-- anchors for periodic tamper verification (no central file silo). + +CREATE TABLE IF NOT EXISTS content_anchors ( + content_hash TEXT PRIMARY KEY, + cid TEXT NOT NULL, + storage_backend TEXT NOT NULL CHECK (storage_backend IN ('ipfs', 'arweave')), + product_id TEXT, + byte_size BIGINT NOT NULL CHECK (byte_size > 0 AND byte_size <= 52428800), + mime_type TEXT, + anchored_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + anchored_by TEXT, + last_verified_at TIMESTAMPTZ, + verification_status TEXT NOT NULL DEFAULT 'pending' + CHECK (verification_status IN ('pending', 'verified', 'tampered', 'unavailable')), + tamper_alert_sent BOOLEAN NOT NULL DEFAULT FALSE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_content_anchors_product_id ON content_anchors(product_id); +CREATE INDEX IF NOT EXISTS idx_content_anchors_verification + ON content_anchors(verification_status, last_verified_at NULLS FIRST); + +CREATE TABLE IF NOT EXISTS content_tamper_alerts ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + content_hash TEXT NOT NULL REFERENCES content_anchors(content_hash) ON DELETE CASCADE, + expected_hash TEXT NOT NULL, + actual_hash TEXT, + cid TEXT NOT NULL, + storage_backend TEXT NOT NULL, + product_id TEXT, + detected_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + alert_sent BOOLEAN NOT NULL DEFAULT FALSE +); + +CREATE INDEX IF NOT EXISTS idx_content_tamper_alerts_detected_at + ON content_tamper_alerts(detected_at DESC); diff --git a/backend/src/handlers.rs b/backend/src/handlers.rs index de713e06..3ba61eec 100644 --- a/backend/src/handlers.rs +++ b/backend/src/handlers.rs @@ -21,3 +21,4 @@ pub mod batch; // pub mod supplier; // Temporarily disabled pub mod predictive_routing; pub mod physics_model; +pub mod storage; diff --git a/backend/src/handlers/storage.rs b/backend/src/handlers/storage.rs new file mode 100644 index 00000000..75844094 --- /dev/null +++ b/backend/src/handlers/storage.rs @@ -0,0 +1,205 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::Json, +}; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +use crate::{ + error::AppError, + services::storage_integrity_service::{RegisterAnchorRequest, MAX_FILE_BYTES}, + AppState, +}; + +#[derive(Debug, Serialize, ToSchema)] +pub struct AnchorResponse { + pub content_hash: String, + pub cid: String, + pub storage_backend: String, + pub product_id: Option, + pub byte_size: i64, + pub mime_type: Option, + pub verification_status: String, + pub anchored_at: chrono::DateTime, +} + +#[derive(Debug, Serialize, ToSchema)] +pub struct ExistsResponse { + pub content_hash: String, + pub exists: bool, +} + +#[derive(Debug, Deserialize, ToSchema)] +pub struct RegisterAnchorBody { + pub content_hash: String, + pub cid: String, + pub storage_backend: String, + #[serde(alias = "productId")] + pub product_id: Option, + pub byte_size: u64, + pub mime_type: Option, + #[serde(alias = "anchoredBy")] + pub anchored_by: Option, +} + +fn to_response(anchor: crate::services::storage_integrity_service::ContentAnchor) -> AnchorResponse { + AnchorResponse { + content_hash: anchor.content_hash, + cid: anchor.cid, + storage_backend: anchor.storage_backend, + product_id: anchor.product_id, + byte_size: anchor.byte_size, + mime_type: anchor.mime_type, + verification_status: anchor.verification_status, + anchored_at: anchor.anchored_at, + } +} + +/// Register a content integrity anchor (CAS — duplicate hash + CID is idempotent). +#[utoipa::path( + post, + path = "/api/v1/storage/anchors", + request_body = RegisterAnchorBody, + responses( + (status = 201, description = "Anchor registered", body = AnchorResponse), + (status = 400, description = "Invalid request"), + (status = 409, description = "Hash mismatch") + ), + tag = "storage" +)] +pub async fn register_anchor( + State(state): State, + Json(body): Json, +) -> Result<(StatusCode, Json), AppError> { + if body.byte_size == 0 || body.byte_size > MAX_FILE_BYTES { + return Err(AppError::BadRequest(format!( + "byte_size must be between 1 and {}", + MAX_FILE_BYTES + ))); + } + + if body.content_hash.len() != 64 { + return Err(AppError::BadRequest( + "content_hash must be a 64-character hex SHA-256 digest".into(), + )); + } + + let req = RegisterAnchorRequest { + content_hash: body.content_hash.to_lowercase(), + cid: body.cid, + storage_backend: body.storage_backend, + product_id: body.product_id, + byte_size: body.byte_size, + mime_type: body.mime_type, + anchored_by: body.anchored_by, + }; + + let anchor = state + .storage_integrity_service + .register_anchor(&req) + .await + .map_err(|e| { + if e.to_string().contains("different CID") { + AppError::AlreadyExists(e.to_string()) + } else if e.to_string().contains("must be") { + AppError::BadRequest(e.to_string()) + } else { + AppError::Database(e) + } + })?; + + Ok((StatusCode::CREATED, Json(to_response(anchor)))) +} + +/// CAS existence check for content-hash deduplication before upload. +#[utoipa::path( + get, + path = "/api/v1/storage/exists/{content_hash}", + params(("content_hash" = String, Path, description = "SHA-256 hex digest")), + responses((status = 200, description = "Existence result", body = ExistsResponse)), + tag = "storage" +)] +pub async fn check_exists( + State(state): State, + Path(content_hash): Path, +) -> Result, AppError> { + let exists = state + .storage_integrity_service + .exists(&content_hash.to_lowercase()) + .await + .map_err(AppError::Database)?; + + Ok(Json(ExistsResponse { + content_hash, + exists, + })) +} + +/// Get anchor metadata by content hash. +#[utoipa::path( + get, + path = "/api/v1/storage/anchors/{content_hash}", + params(("content_hash" = String, Path, description = "SHA-256 hex digest")), + responses( + (status = 200, description = "Anchor found", body = AnchorResponse), + (status = 404, description = "Not found") + ), + tag = "storage" +)] +pub async fn get_anchor( + State(state): State, + Path(content_hash): Path, +) -> Result, AppError> { + let anchor = state + .storage_integrity_service + .get_anchor(&content_hash.to_lowercase()) + .await + .map_err(AppError::Database)? + .ok_or_else(|| AppError::NotFound("Anchor not found".into()))?; + + Ok(Json(to_response(anchor))) +} + +/// List anchors for a product. +#[utoipa::path( + get, + path = "/api/v1/storage/products/{product_id}/anchors", + params(("product_id" = String, Path, description = "Product identifier")), + responses((status = 200, description = "Product anchors", body = [AnchorResponse])), + tag = "storage" +)] +pub async fn list_product_anchors( + State(state): State, + Path(product_id): Path, +) -> Result>, AppError> { + let anchors = state + .storage_integrity_service + .list_product_anchors(&product_id) + .await + .map_err(AppError::Database)?; + + Ok(Json(anchors.into_iter().map(to_response).collect())) +} + +/// Trigger on-demand verification (admin/cron hook). +#[utoipa::path( + post, + path = "/api/v1/storage/verify", + responses((status = 200, description = "Verification batch completed")), + tag = "storage" +)] +pub async fn trigger_verification( + State(state): State, +) -> Result, AppError> { + let tampered = state + .storage_integrity_service + .verify_pending_anchors() + .await + .map_err(AppError::Database)?; + + Ok(Json(serde_json::json!({ + "status": "completed", + "tampered_count": tampered + }))) +} diff --git a/backend/src/main.rs b/backend/src/main.rs index 9db0c062..7beb26a1 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -43,7 +43,8 @@ use services::{ CollaborationService, EventService, FinancialService, PredictiveRoutingService, ProductService, RecallService, SyncService, UserService, CollaborationService, EventService, FinancialService, IoTService, PhysicsModelService, PredictiveRoutingService, - ProductService, QualityService, RecallService, RegulatoryService, SupplierService, + ProductService, QualityService, RecallService, RegulatoryService, StorageConfig, + StorageIntegrityService, SupplierService, SyncService, UserService, MercuryIndexer, MercuryConfig, RuleEngine, SagaManager, RedisWorkerPool, WorkerConfig, TrackingEventProcessor, get_default_rules, get_product_registration_saga, NoopAction, EventProcessingHandler, RuleEvaluationHandler, @@ -82,6 +83,7 @@ pub struct AppState { pub rule_engine: Arc, pub saga_manager: Arc, pub worker_pool: Arc, + pub storage_integrity_service: Arc, } impl AppState { @@ -120,6 +122,11 @@ impl AppState { let predictive_routing_service = Arc::new(PredictiveRoutingService::new(db.pool().clone())); let physics_model_service = Arc::new(PhysicsModelService::new(db.pool().clone())); + let storage_integrity_service = Arc::new(StorageIntegrityService::new( + db.pool().clone(), + redis_client.clone(), + StorageConfig::default(), + )); // Initialize Mercury streaming indexer let mercury_config = MercuryConfig::default(); @@ -193,6 +200,7 @@ impl AppState { rule_engine, saga_manager, worker_pool, + storage_integrity_service, }) } } diff --git a/backend/src/routes.rs b/backend/src/routes.rs index 624a1d41..a94af7ba 100644 --- a/backend/src/routes.rs +++ b/backend/src/routes.rs @@ -19,6 +19,7 @@ pub fn api_routes() -> Router { .nest("/api/v1/monitoring", monitoring_routes()) .nest("/api/v1/collaboration", collaboration_routes()) .nest("/api/v1/routing", routing_routes()) + .nest("/api/v1/storage", storage_routes()) // .nest("/api/v1/iot", iot_routes()) // Temporarily disabled // .nest("/api/v1/quality", quality_routes()) // Temporarily disabled // .nest("/api/v1/regulatory", regulatory_routes()) // Temporarily disabled @@ -380,3 +381,33 @@ fn physics_routes() -> Router { crate::middleware::rate_limit::rate_limit_middleware, )) } + +fn storage_routes() -> Router { + Router::new() + .route( + "/anchors", + post(crate::handlers::storage::register_anchor), + ) + .route( + "/anchors/:content_hash", + get(crate::handlers::storage::get_anchor), + ) + .route( + "/exists/:content_hash", + get(crate::handlers::storage::check_exists), + ) + .route( + "/products/:product_id/anchors", + get(crate::handlers::storage::list_product_anchors), + ) + .route( + "/verify", + post(crate::handlers::storage::trigger_verification).layer(middleware::from_fn( + require_role(vec![UserRole::Administrator, UserRole::Auditor]), + )), + ) + .layer(middleware::from_fn(api_key_auth)) + .layer(middleware::from_fn( + crate::middleware::rate_limit::rate_limit_middleware, + )) +} diff --git a/backend/src/services.rs b/backend/src/services.rs index 62442621..974d7b6a 100644 --- a/backend/src/services.rs +++ b/backend/src/services.rs @@ -69,3 +69,9 @@ pub use physics_model_service::PhysicsModelService; pub mod iot_twin_sync_service; pub use iot_twin_sync_service::IoTTwinSyncService; + +pub mod storage_integrity_service; +pub use storage_integrity_service::{ + ContentAnchor, RegisterAnchorRequest, StorageBackend, StorageConfig, + StorageIntegrityService, TamperAlert, VerificationStatus, MAX_FILE_BYTES, +}; diff --git a/backend/src/services/storage_integrity_service.rs b/backend/src/services/storage_integrity_service.rs new file mode 100644 index 00000000..fe427887 --- /dev/null +++ b/backend/src/services/storage_integrity_service.rs @@ -0,0 +1,428 @@ +use chrono::{DateTime, Utc}; +use redis::AsyncCommands; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use sqlx::PgPool; +use tracing::{error, info, warn}; + +/// Maximum file size supported for decentralized storage (50 MiB). +pub const MAX_FILE_BYTES: u64 = 50 * 1024 * 1024; + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::Type, PartialEq, Eq)] +#[sqlx(type_name = "TEXT", rename_all = "lowercase")] +pub enum StorageBackend { + Ipfs, + Arweave, +} + +impl StorageBackend { + pub fn as_str(&self) -> &'static str { + match self { + StorageBackend::Ipfs => "ipfs", + StorageBackend::Arweave => "arweave", + } + } + + pub fn from_str(s: &str) -> Option { + match s.to_lowercase().as_str() { + "ipfs" => Some(StorageBackend::Ipfs), + "arweave" => Some(StorageBackend::Arweave), + _ => None, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::Type, PartialEq, Eq)] +#[sqlx(type_name = "TEXT", rename_all = "lowercase")] +pub enum VerificationStatus { + Pending, + Verified, + Tampered, + Unavailable, +} + +impl VerificationStatus { + pub fn as_str(&self) -> &'static str { + match self { + VerificationStatus::Pending => "pending", + VerificationStatus::Verified => "verified", + VerificationStatus::Tampered => "tampered", + VerificationStatus::Unavailable => "unavailable", + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct ContentAnchor { + pub content_hash: String, + pub cid: String, + pub storage_backend: String, + pub product_id: Option, + pub byte_size: i64, + pub mime_type: Option, + pub anchored_at: DateTime, + pub anchored_by: Option, + pub last_verified_at: Option>, + pub verification_status: String, + pub tamper_alert_sent: bool, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RegisterAnchorRequest { + pub content_hash: String, + pub cid: String, + pub storage_backend: String, + pub product_id: Option, + pub byte_size: u64, + pub mime_type: Option, + pub anchored_by: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TamperAlert { + pub content_hash: String, + pub cid: String, + pub storage_backend: String, + pub product_id: Option, + pub expected_hash: String, + pub actual_hash: Option, + pub detected_at: DateTime, +} + +#[derive(Debug, Clone)] +pub struct StorageConfig { + pub ipfs_gateway: String, + pub arweave_gateway: String, + pub verification_batch_size: i64, +} + +impl Default for StorageConfig { + fn default() -> Self { + Self { + ipfs_gateway: std::env::var("IPFS_GATEWAY") + .unwrap_or_else(|_| "https://ipfs.io/ipfs/".to_string()), + arweave_gateway: std::env::var("ARWEAVE_GATEWAY") + .unwrap_or_else(|_| "https://arweave.net/".to_string()), + verification_batch_size: std::env::var("STORAGE_VERIFY_BATCH_SIZE") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(50), + } + } +} + +enum VerificationOutcome { + Match, + Tampered { actual_hash: String }, +} + +pub struct StorageIntegrityService { + pool: PgPool, + redis_client: redis::Client, + config: StorageConfig, + http: reqwest::Client, +} + +impl StorageIntegrityService { + pub fn new(pool: PgPool, redis_client: redis::Client, config: StorageConfig) -> Self { + Self { + pool, + redis_client, + config, + http: reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(120)) + .build() + .unwrap_or_default(), + } + } + + /// CAS lookup — returns true when this content hash is already registered. + pub async fn exists(&self, content_hash: &str) -> Result { + let row: Option<(i64,)> = sqlx::query_as( + "SELECT 1 FROM content_anchors WHERE content_hash = $1", + ) + .bind(content_hash) + .fetch_optional(&self.pool) + .await?; + Ok(row.is_some()) + } + + pub async fn get_anchor(&self, content_hash: &str) -> Result, sqlx::Error> { + sqlx::query_as::<_, ContentAnchor>( + "SELECT * FROM content_anchors WHERE content_hash = $1", + ) + .bind(content_hash) + .fetch_optional(&self.pool) + .await + } + + /// Register an anchor (idempotent CAS — same hash + cid is a no-op). + pub async fn register_anchor( + &self, + req: &RegisterAnchorRequest, + ) -> Result { + if req.byte_size == 0 || req.byte_size > MAX_FILE_BYTES { + return Err(sqlx::Error::Protocol( + "byte_size must be between 1 and 52428800".into(), + )); + } + + if StorageBackend::from_str(&req.storage_backend).is_none() { + return Err(sqlx::Error::Protocol( + "storage_backend must be ipfs or arweave".into(), + )); + } + + if let Some(existing) = self.get_anchor(&req.content_hash).await? { + if existing.cid == req.cid && existing.storage_backend == req.storage_backend { + return Ok(existing); + } + return Err(sqlx::Error::Protocol( + "content_hash already anchored with different CID".into(), + )); + } + + sqlx::query_as::<_, ContentAnchor>( + r#" + INSERT INTO content_anchors ( + content_hash, cid, storage_backend, product_id, byte_size, + mime_type, anchored_by, verification_status + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, 'pending') + RETURNING * + "#, + ) + .bind(&req.content_hash) + .bind(&req.cid) + .bind(&req.storage_backend.to_lowercase()) + .bind(&req.product_id) + .bind(req.byte_size as i64) + .bind(&req.mime_type) + .bind(&req.anchored_by) + .fetch_one(&self.pool) + .await + } + + pub async fn list_product_anchors( + &self, + product_id: &str, + ) -> Result, sqlx::Error> { + sqlx::query_as::<_, ContentAnchor>( + "SELECT * FROM content_anchors WHERE product_id = $1 ORDER BY anchored_at DESC", + ) + .bind(product_id) + .fetch_all(&self.pool) + .await + } + + /// Periodic verification: fetch from decentralized storage and compare hashes. + pub async fn verify_pending_anchors(&self) -> Result { + let anchors: Vec = sqlx::query_as::<_, ContentAnchor>( + r#" + SELECT * FROM content_anchors + WHERE verification_status IN ('pending', 'verified') + ORDER BY last_verified_at NULLS FIRST, anchored_at ASC + LIMIT $1 + "#, + ) + .bind(self.config.verification_batch_size) + .fetch_all(&self.pool) + .await?; + + let mut tamper_count = 0usize; + + for anchor in anchors { + match self.verify_single(&anchor).await { + Ok(VerificationOutcome::Match) => { + sqlx::query( + r#" + UPDATE content_anchors + SET verification_status = 'verified', + last_verified_at = NOW(), + updated_at = NOW() + WHERE content_hash = $1 + "#, + ) + .bind(&anchor.content_hash) + .execute(&self.pool) + .await?; + } + Ok(VerificationOutcome::Tampered { actual_hash }) => { + tamper_count += 1; + self.record_tamper(&anchor, Some(actual_hash)).await?; + } + Err(e) => { + warn!( + content_hash = %anchor.content_hash, + error = %e, + "Content unavailable for verification" + ); + sqlx::query( + r#" + UPDATE content_anchors + SET verification_status = 'unavailable', + last_verified_at = NOW(), + updated_at = NOW() + WHERE content_hash = $1 + "#, + ) + .bind(&anchor.content_hash) + .execute(&self.pool) + .await?; + } + } + } + + if tamper_count > 0 { + info!("Tamper detection: {} anchor(s) failed verification", tamper_count); + } + + Ok(tamper_count) + } + + async fn verify_single( + &self, + anchor: &ContentAnchor, + ) -> Result { + let bytes = self.fetch_content(anchor).await?; + if bytes.len() as u64 > MAX_FILE_BYTES { + return Ok(VerificationOutcome::Tampered { + actual_hash: hex::encode(Sha256::digest(&bytes)), + }); + } + let actual = hex::encode(Sha256::digest(&bytes)); + let expected = anchor.content_hash.trim_start_matches("0x").to_lowercase(); + if actual == expected { + Ok(VerificationOutcome::Match) + } else { + Ok(VerificationOutcome::Tampered { actual_hash: actual }) + } + } + + async fn fetch_content(&self, anchor: &ContentAnchor) -> Result, reqwest::Error> { + let url = match anchor.storage_backend.as_str() { + "ipfs" => format!( + "{}{}", + self.config.ipfs_gateway.trim_end_matches('/'), + format!("/{}", anchor.cid.trim_start_matches('/')) + ), + "arweave" => format!( + "{}{}", + self.config.arweave_gateway.trim_end_matches('/'), + format!("/{}", anchor.cid.trim_start_matches('/')) + ), + _ => return Ok(Vec::new()), + }; + + let response = self.http.get(&url).send().await?; + response.error_for_status()?.bytes().await.map(|b| b.to_vec()) + } + + async fn record_tamper( + &self, + anchor: &ContentAnchor, + actual_hash: Option, + ) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE content_anchors + SET verification_status = 'tampered', + tamper_alert_sent = TRUE, + last_verified_at = NOW(), + updated_at = NOW() + WHERE content_hash = $1 + "#, + ) + .bind(&anchor.content_hash) + .execute(&self.pool) + .await?; + + sqlx::query( + r#" + INSERT INTO content_tamper_alerts ( + content_hash, expected_hash, actual_hash, cid, + storage_backend, product_id, alert_sent + ) + VALUES ($1, $2, $3, $4, $5, $6, TRUE) + "#, + ) + .bind(&anchor.content_hash) + .bind(&anchor.content_hash) + .bind(&actual_hash) + .bind(&anchor.cid) + .bind(&anchor.storage_backend) + .bind(&anchor.product_id) + .execute(&self.pool) + .await?; + + let alert = TamperAlert { + content_hash: anchor.content_hash.clone(), + cid: anchor.cid.clone(), + storage_backend: anchor.storage_backend.clone(), + product_id: anchor.product_id.clone(), + expected_hash: anchor.content_hash.clone(), + actual_hash, + detected_at: Utc::now(), + }; + + if let Err(e) = self.publish_tamper_alert(&alert).await { + error!("Failed to publish tamper alert: {}", e); + } + + Ok(()) + } + + async fn publish_tamper_alert(&self, alert: &TamperAlert) -> Result<(), redis::RedisError> { + let payload = serde_json::to_string(alert).map_err(|e| { + redis::RedisError::from(( + redis::ErrorKind::TypeError, + "serialization", + e.to_string(), + )) + })?; + + if let Ok(mut conn) = self.redis_client.get_multiplexed_tokio_connection().await { + let _: () = conn + .publish("storage:tamper_alerts", payload) + .await?; + info!( + "Tamper alert published for content_hash={} cid={}", + alert.content_hash, alert.cid + ); + } + Ok(()) + } +} + +impl Clone for StorageIntegrityService { + fn clone(&self) -> Self { + Self { + pool: self.pool.clone(), + redis_client: self.redis_client.clone(), + config: self.config.clone(), + http: self.http.clone(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn max_file_size_is_50mb() { + assert_eq!(MAX_FILE_BYTES, 52_428_800); + } + + #[test] + fn storage_backend_parsing() { + assert_eq!(StorageBackend::from_str("ipfs"), Some(StorageBackend::Ipfs)); + assert_eq!( + StorageBackend::from_str("ARWEAVE"), + Some(StorageBackend::Arweave) + ); + assert!(StorageBackend::from_str("s3").is_none()); + } +} diff --git a/backend/src/utils.rs b/backend/src/utils.rs index c5bfc681..870d1d7a 100644 --- a/backend/src/utils.rs +++ b/backend/src/utils.rs @@ -1,4 +1,4 @@ -use crate::services::{ApiKeyService, EventService, ProductService, SyncService}; +use crate::services::{ApiKeyService, EventService, ProductService, StorageIntegrityService, SyncService}; use chrono::{DateTime, Utc}; use sqlx::PgPool; use std::time::Duration; @@ -100,15 +100,19 @@ pub struct CronService { redis_client: redis::Client, backup_service: BackupService, sync_service: SyncService, + storage_integrity_service: StorageIntegrityService, } impl CronService { pub fn new(pool: PgPool, redis_client: redis::Client) -> Self { + let storage_integrity_service = + StorageIntegrityService::new(pool.clone(), redis_client.clone(), Default::default()); Self { pool: pool.clone(), redis_client: redis_client.clone(), backup_service: BackupService::new(pool.clone()), sync_service: SyncService::new(pool, redis_client), + storage_integrity_service, } } @@ -170,6 +174,25 @@ impl CronService { } }); + // Periodic decentralized storage integrity verification (every 15 minutes) + let storage_service = self.storage_integrity_service.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(900)); + loop { + interval.tick().await; + match storage_service.verify_pending_anchors().await { + Ok(tampered) if tampered > 0 => { + tracing::warn!( + "Storage verification detected {} tampered anchor(s)", + tampered + ); + } + Ok(_) => tracing::debug!("Storage verification batch completed"), + Err(e) => tracing::error!("Storage verification failed: {}", e), + } + } + }); + tracing::info!("Cron scheduler started"); } } diff --git a/sdk/python/pyproject.toml b/sdk/python/pyproject.toml index 841451c7..0d8d5683 100644 --- a/sdk/python/pyproject.toml +++ b/sdk/python/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "requests>=2.28.0", "pydantic>=2.0.0", "python-dateutil>=2.8.0", + "base58>=2.1.0", ] [project.optional-dependencies] diff --git a/sdk/python/src/chainlogistics_sdk/__init__.py b/sdk/python/src/chainlogistics_sdk/__init__.py index fabbc9e6..562dcb56 100644 --- a/sdk/python/src/chainlogistics_sdk/__init__.py +++ b/sdk/python/src/chainlogistics_sdk/__init__.py @@ -33,6 +33,13 @@ PaginationMeta, ) from . import ring_signature +from .storage import ( + MAX_FILE_BYTES, + StorageBackend, + StorageBridge, + StorageBridgeConfig, + UploadResult, +) from .ring_signature import ( KeyPair, RingSignature, @@ -81,4 +88,10 @@ "sign", "verify", "aggregate_ring", + # Decentralized storage bridge + "MAX_FILE_BYTES", + "StorageBackend", + "StorageBridge", + "StorageBridgeConfig", + "UploadResult", ] diff --git a/sdk/python/src/chainlogistics_sdk/storage.py b/sdk/python/src/chainlogistics_sdk/storage.py new file mode 100644 index 00000000..8c3dc995 --- /dev/null +++ b/sdk/python/src/chainlogistics_sdk/storage.py @@ -0,0 +1,194 @@ +"""Direct IPFS / Arweave storage bridge with CAS deduplication.""" + +from __future__ import annotations + +import hashlib +import os +from dataclasses import dataclass +from enum import Enum +from typing import Any, Dict, Optional + +import requests + +from .exceptions import ValidationError + +MAX_FILE_BYTES = 50 * 1024 * 1024 + + +class StorageBackend(str, Enum): + IPFS = "ipfs" + ARWEAVE = "arweave" + + +@dataclass +class StorageBridgeConfig: + ipfs_api_url: str = os.environ.get("IPFS_API_URL", "http://127.0.0.1:5001") + ipfs_gateway: str = os.environ.get("IPFS_GATEWAY", "https://ipfs.io/ipfs/") + arweave_gateway: str = os.environ.get("ARWEAVE_GATEWAY", "https://arweave.net") + anchor_registry_url: Optional[str] = None + api_key: Optional[str] = None + + +@dataclass +class UploadResult: + content_hash: str + cid: str + backend: str + byte_size: int + deduplicated: bool + + +class StorageBridge: + """Upload manuals/PDFs directly to IPFS or Arweave — no central file silo.""" + + def __init__(self, config: Optional[StorageBridgeConfig] = None) -> None: + self.config = config or StorageBridgeConfig() + self.session = requests.Session() + + @staticmethod + def content_hash(data: bytes) -> str: + return hashlib.sha256(data).hexdigest() + + @staticmethod + def cid_v0_from_hash(hash_hex: str) -> str: + import base58 + + digest = bytes.fromhex(hash_hex.removeprefix("0x")) + if len(digest) != 32: + raise ValidationError("hash must be 32 bytes") + multihash = b"\x12\x20" + digest + return base58.b58encode(multihash).decode("ascii") + + def upload( + self, + data: bytes, + backend: StorageBackend = StorageBackend.IPFS, + product_id: Optional[str] = None, + ) -> UploadResult: + if not data or len(data) > MAX_FILE_BYTES: + raise ValidationError( + f"file size must be between 1 and {MAX_FILE_BYTES} bytes" + ) + + content_hash = self.content_hash(data) + + if self._cas_exists(content_hash): + cid = ( + self.cid_v0_from_hash(content_hash) + if backend == StorageBackend.IPFS + else content_hash + ) + return UploadResult( + content_hash=content_hash, + cid=cid, + backend=backend.value, + byte_size=len(data), + deduplicated=True, + ) + + if backend == StorageBackend.IPFS: + cid = self._upload_ipfs(data) + else: + cid = self._upload_arweave(data) + + self._register_anchor(content_hash, cid, backend.value, len(data), product_id) + + return UploadResult( + content_hash=content_hash, + cid=cid, + backend=backend.value, + byte_size=len(data), + deduplicated=False, + ) + + def verify(self, cid: str, expected_hash: str, backend: StorageBackend) -> bool: + data = self.fetch(cid, backend) + return self.content_hash(data) == expected_hash.removeprefix("0x").lower() + + def fetch(self, cid: str, backend: StorageBackend) -> bytes: + if backend == StorageBackend.IPFS: + url = f"{self.config.ipfs_gateway.rstrip('/')}/{cid.lstrip('/')}" + else: + url = f"{self.config.arweave_gateway.rstrip('/')}/{cid.lstrip('/')}" + + response = self.session.get(url, timeout=120) + response.raise_for_status() + data = response.content + if len(data) > MAX_FILE_BYTES: + raise ValidationError("fetched content exceeds 50MB limit") + return data + + def _cas_exists(self, content_hash: str) -> bool: + if not self.config.anchor_registry_url: + return False + url = ( + f"{self.config.anchor_registry_url.rstrip('/')}" + f"/api/v1/storage/exists/{content_hash}" + ) + headers = {} + if self.config.api_key: + headers["Authorization"] = f"Bearer {self.config.api_key}" + response = self.session.get(url, headers=headers, timeout=30) + if response.status_code == 404: + return False + if not response.ok: + return False + body: Dict[str, Any] = response.json() + return bool(body.get("exists")) + + def _register_anchor( + self, + content_hash: str, + cid: str, + backend: str, + byte_size: int, + product_id: Optional[str], + ) -> None: + if not self.config.anchor_registry_url: + return + url = f"{self.config.anchor_registry_url.rstrip('/')}/api/v1/storage/anchors" + headers = {"Content-Type": "application/json"} + if self.config.api_key: + headers["Authorization"] = f"Bearer {self.config.api_key}" + payload = { + "content_hash": content_hash, + "cid": cid, + "storage_backend": backend, + "product_id": product_id, + "byte_size": byte_size, + } + response = self.session.post(url, json=payload, headers=headers, timeout=30) + if response.status_code not in (201, 409): + response.raise_for_status() + + def _upload_ipfs(self, data: bytes) -> str: + url = f"{self.config.ipfs_api_url.rstrip('/')}/api/v0/add?pin=true" + response = self.session.post( + url, + files={"file": ("content", data, "application/octet-stream")}, + timeout=300, + ) + response.raise_for_status() + body: Dict[str, Any] = response.json() + cid = body.get("Hash") + if not cid: + raise ValidationError("IPFS response missing Hash") + return str(cid) + + def _upload_arweave(self, data: bytes) -> str: + url = f"{self.config.arweave_gateway.rstrip('/')}/tx" + response = self.session.post( + url, + data=data, + headers={"Content-Type": "application/octet-stream"}, + timeout=300, + ) + response.raise_for_status() + text = response.text.strip() + try: + body = response.json() + if isinstance(body, dict) and body.get("id"): + return str(body["id"]) + except ValueError: + pass + return text diff --git a/sdk/python/tests/test_storage.py b/sdk/python/tests/test_storage.py new file mode 100644 index 00000000..bd0b56b8 --- /dev/null +++ b/sdk/python/tests/test_storage.py @@ -0,0 +1,25 @@ +"""Tests for decentralized storage bridge.""" + +import pytest + +from chainlogistics_sdk.storage import ( + MAX_FILE_BYTES, + StorageBackend, + StorageBridge, +) + + +def test_content_hash_sha256() -> None: + assert len(StorageBridge.content_hash(b"manual")) == 64 + + +def test_cid_v0_from_hash() -> None: + h = StorageBridge.content_hash(b"hello") + cid = StorageBridge.cid_v0_from_hash(h) + assert len(cid) >= 40 + + +def test_rejects_oversized_file() -> None: + bridge = StorageBridge() + with pytest.raises(Exception): + bridge.upload(b"\x00" * (MAX_FILE_BYTES + 1), StorageBackend.IPFS) diff --git a/sdk/rust/Cargo.toml b/sdk/rust/Cargo.toml index 9c7c2596..62afc16f 100644 --- a/sdk/rust/Cargo.toml +++ b/sdk/rust/Cargo.toml @@ -11,27 +11,32 @@ keywords = ["blockchain", "supply-chain", "logistics", "api"] categories = ["api-bindings", "web-programming::http-client"] [dependencies] -reqwest = { version = "0.11", features = ["json", "rustls-tls"], default-features = false } +reqwest = { version = "0.11", features = ["json", "rustls-tls", "multipart"], default-features = false } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" chrono = { version = "0.4", features = ["serde"] } uuid = { version = "1.0", features = ["v4", "serde"] } thiserror = "1.0" url = "2.0" +hex = { version = "0.4", optional = true } +bs58 = { version = "0.5", optional = true } +sha2 = { version = "0.10", optional = true } tokio = { version = "1.0", features = ["rt-multi-thread", "macros"], optional = true } # Ring-signature signing (privacy-preserving audit trail). Optional so the HTTP # client stays dependency-light when the feature is unused. bls12_381 = { version = "0.8", optional = true } -sha2 = { version = "0.10", optional = true } rand_core = { version = "0.6", features = ["getrandom"], optional = true } [dev-dependencies] tokio-test = "0.4" mockito = "1.0" rand_chacha = "0.3" +tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] } [features] default = ["client"] client = ["tokio"] +# Direct IPFS / Arweave storage bridge with CAS dedup. +decentralized-storage = ["tokio", "sha2", "hex", "bs58"] # Enables the `ring_signature` module for anonymous audit attestations. ring-signatures = ["bls12_381", "sha2", "rand_core"] diff --git a/sdk/rust/src/lib.rs b/sdk/rust/src/lib.rs index 9874ac1b..1e7633ca 100644 --- a/sdk/rust/src/lib.rs +++ b/sdk/rust/src/lib.rs @@ -30,6 +30,9 @@ pub mod products; pub mod events; pub mod stats; +#[cfg(feature = "decentralized-storage")] +pub mod storage; + /// Privacy-preserving audit-trail ring signatures (BLS12-381). Enable with the /// `ring-signatures` feature. Interoperates with the Soroban /// `RingSignatureVerifier` / `AuditTrailContract` contracts. @@ -42,6 +45,9 @@ pub use config::Config; pub use error::{Error, Result}; pub use models::*; +#[cfg(feature = "decentralized-storage")] +pub use storage::{StorageBackend, StorageBridge, StorageBridgeConfig, UploadResult, MAX_FILE_BYTES}; + /// SDK version pub const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/sdk/rust/src/storage.rs b/sdk/rust/src/storage.rs new file mode 100644 index 00000000..e0163597 --- /dev/null +++ b/sdk/rust/src/storage.rs @@ -0,0 +1,344 @@ +//! Decentralized storage bridge — direct IPFS / Arweave uploads with CAS dedup. +//! +//! Files go straight to configured decentralized endpoints (no central file silo). +//! Content is keyed by SHA-256; identical manuals deduplicate before upload. + +use reqwest::multipart; +use sha2::{Digest, Sha256}; + +use crate::{Error, Result}; + +/// Maximum supported file size (50 MiB). +pub const MAX_FILE_BYTES: u64 = 50 * 1024 * 1024; + +/// Target decentralized storage network. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StorageBackend { + Ipfs, + Arweave, +} + +impl StorageBackend { + pub fn as_str(self) -> &'static str { + match self { + StorageBackend::Ipfs => "ipfs", + StorageBackend::Arweave => "arweave", + } + } +} + +/// Configuration for direct decentralized storage access. +#[derive(Debug, Clone)] +pub struct StorageBridgeConfig { + /// Kubo / IPFS HTTP API base (e.g. `http://127.0.0.1:5001`). + pub ipfs_api_url: String, + /// Read gateway for IPFS fetches (e.g. `https://ipfs.io/ipfs/`). + pub ipfs_gateway: String, + /// Arweave gateway for uploads and reads (e.g. `https://arweave.net`). + pub arweave_gateway: String, + /// Optional ChainLogistics API for CAS anchor registry (metadata only). + pub anchor_registry_url: Option, + pub api_key: Option, +} + +impl Default for StorageBridgeConfig { + fn default() -> Self { + Self { + ipfs_api_url: std::env::var("IPFS_API_URL") + .unwrap_or_else(|_| "http://127.0.0.1:5001".to_string()), + ipfs_gateway: std::env::var("IPFS_GATEWAY") + .unwrap_or_else(|_| "https://ipfs.io/ipfs/".to_string()), + arweave_gateway: std::env::var("ARWEAVE_GATEWAY") + .unwrap_or_else(|_| "https://arweave.net".to_string()), + anchor_registry_url: None, + api_key: None, + } + } +} + +/// Result of a successful decentralized upload. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct UploadResult { + pub content_hash: String, + pub cid: String, + pub backend: String, + pub byte_size: u64, + pub deduplicated: bool, +} + +/// Direct IPFS / Arweave bridge with content-addressed deduplication. +#[derive(Debug, Clone)] +pub struct StorageBridge { + config: StorageBridgeConfig, + http: reqwest::Client, +} + +impl StorageBridge { + pub fn new(config: StorageBridgeConfig) -> Result { + let http = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(300)) + .build()?; + Ok(Self { config, http }) + } + + /// SHA-256 hex digest of content. + pub fn content_hash(data: &[u8]) -> String { + hex::encode(Sha256::digest(data)) + } + + /// IPFS CIDv0 (base58btc) derived from SHA-256 multihash. + pub fn cid_v0_from_hash(hash_hex: &str) -> Result { + let hash_bytes = hex::decode(hash_hex.trim_start_matches("0x")) + .map_err(|e| Error::Validation(format!("invalid hash hex: {e}")))?; + if hash_bytes.len() != 32 { + return Err(Error::Validation("hash must be 32 bytes".into())); + } + let mut multihash = Vec::with_capacity(34); + multihash.push(0x12); // sha2-256 + multihash.push(0x20); + multihash.extend_from_slice(&hash_bytes); + Ok(bs58::encode(multihash).into_string()) + } + + /// Upload content with CAS dedup — skips re-upload when hash already exists. + pub async fn upload( + &self, + data: &[u8], + backend: StorageBackend, + product_id: Option<&str>, + ) -> Result { + if data.is_empty() || data.len() as u64 > MAX_FILE_BYTES { + return Err(Error::Validation(format!( + "file size must be between 1 and {MAX_FILE_BYTES} bytes" + ))); + } + + let content_hash = Self::content_hash(data); + + if self.cas_exists(&content_hash).await? { + let cid = match backend { + StorageBackend::Ipfs => Self::cid_v0_from_hash(&content_hash)?, + StorageBackend::Arweave => content_hash.clone(), + }; + return Ok(UploadResult { + content_hash, + cid, + backend: backend.as_str().to_string(), + byte_size: data.len() as u64, + deduplicated: true, + }); + } + + let (cid, backend_str) = match backend { + StorageBackend::Ipfs => (self.upload_ipfs(data).await?, "ipfs".to_string()), + StorageBackend::Arweave => (self.upload_arweave(data).await?, "arweave".to_string()), + }; + + self.register_anchor(&content_hash, &cid, &backend_str, data.len() as u64, product_id) + .await?; + + Ok(UploadResult { + content_hash, + cid, + backend: backend_str, + byte_size: data.len() as u64, + deduplicated: false, + }) + } + + /// Fetch content from decentralized storage and verify against expected hash. + pub async fn verify( + &self, + cid: &str, + expected_hash: &str, + backend: StorageBackend, + ) -> Result { + let bytes = self.fetch(cid, backend).await?; + Ok(Self::content_hash(&bytes) == expected_hash.trim_start_matches("0x").to_lowercase()) + } + + /// Fetch raw bytes from IPFS or Arweave. + pub async fn fetch(&self, cid: &str, backend: StorageBackend) -> Result> { + let url = match backend { + StorageBackend::Ipfs => format!( + "{}/{}", + self.config.ipfs_gateway.trim_end_matches('/'), + cid.trim_start_matches('/') + ), + StorageBackend::Arweave => format!( + "{}/{}", + self.config.arweave_gateway.trim_end_matches('/'), + cid.trim_start_matches('/') + ), + }; + + let response = self.http.get(&url).send().await?; + if !response.status().is_success() { + return Err(Error::api( + response.status().as_u16(), + format!("failed to fetch {cid}"), + )); + } + let bytes = response.bytes().await?; + if bytes.len() as u64 > MAX_FILE_BYTES { + return Err(Error::Validation("fetched content exceeds 50MB limit".into())); + } + Ok(bytes.to_vec()) + } + + async fn cas_exists(&self, content_hash: &str) -> Result { + let Some(base) = &self.config.anchor_registry_url else { + return Ok(false); + }; + + let mut req = self + .http + .get(format!( + "{}/api/v1/storage/exists/{}", + base.trim_end_matches('/'), + content_hash + )); + + if let Some(key) = &self.config.api_key { + req = req.bearer_auth(key); + } + + let response = req.send().await?; + if response.status().as_u16() == 404 { + return Ok(false); + } + if !response.status().is_success() { + return Ok(false); + } + + let body: serde_json::Value = response.json().await?; + Ok(body.get("exists").and_then(|v| v.as_bool()).unwrap_or(false)) + } + + async fn register_anchor( + &self, + content_hash: &str, + cid: &str, + backend: &str, + byte_size: u64, + product_id: Option<&str>, + ) -> Result<()> { + let Some(base) = &self.config.anchor_registry_url else { + return Ok(()); + }; + + let body = serde_json::json!({ + "content_hash": content_hash, + "cid": cid, + "storage_backend": backend, + "product_id": product_id, + "byte_size": byte_size, + }); + + let mut req = self + .http + .post(format!("{}/api/v1/storage/anchors", base.trim_end_matches('/'))) + .json(&body); + + if let Some(key) = &self.config.api_key { + req = req.bearer_auth(key); + } + + let response = req.send().await?; + if response.status().is_success() || response.status().as_u16() == 409 { + return Ok(()); + } + Err(Error::api( + response.status().as_u16(), + "failed to register anchor metadata", + )) + } + + async fn upload_ipfs(&self, data: &[u8]) -> Result { + let part = multipart::Part::bytes(data.to_vec()) + .file_name("content".to_string()) + .mime_str("application/octet-stream") + .map_err(|e| Error::Validation(e.to_string()))?; + + let form = multipart::Form::new().part("file", part); + + let url = format!( + "{}/api/v0/add?pin=true", + self.config.ipfs_api_url.trim_end_matches('/') + ); + + let response = self.http.post(&url).multipart(form).send().await?; + if !response.status().is_success() { + return Err(Error::api( + response.status().as_u16(), + "IPFS add failed", + )); + } + + let body: serde_json::Value = response.json().await?; + body.get("Hash") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .ok_or_else(|| Error::Validation("IPFS response missing Hash".into())) + } + + async fn upload_arweave(&self, data: &[u8]) -> Result { + let url = format!( + "{}/tx", + self.config.arweave_gateway.trim_end_matches('/') + ); + + let response = self + .http + .post(&url) + .header("Content-Type", "application/octet-stream") + .body(data.to_vec()) + .send() + .await?; + + if !response.status().is_success() { + return Err(Error::api( + response.status().as_u16(), + "Arweave upload failed", + )); + } + + // Arweave gateways return the transaction id as plain text or JSON. + let text = response.text().await?; + if let Ok(json) = serde_json::from_str::(&text) { + if let Some(id) = json.get("id").and_then(|v| v.as_str()) { + return Ok(id.to_string()); + } + } + Ok(text.trim().to_string()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn content_hash_is_sha256_hex() { + let hash = StorageBridge::content_hash(b"manual"); + assert_eq!(hash.len(), 64); + } + + #[test] + fn cid_v0_from_known_hash() { + let hash = StorageBridge::content_hash(b"hello"); + let cid = StorageBridge::cid_v0_from_hash(&hash).unwrap(); + assert!(cid.starts_with('Q') || cid.starts_with('b')); // base58btc + } + + #[test] + fn rejects_oversized_payload() { + let rt = tokio::runtime::Runtime::new().unwrap(); + let bridge = StorageBridge::new(StorageBridgeConfig::default()).unwrap(); + let oversized = vec![0u8; (MAX_FILE_BYTES + 1) as usize]; + let err = rt + .block_on(bridge.upload(&oversized, StorageBackend::Ipfs, None)) + .unwrap_err(); + assert!(matches!(err, Error::Validation(_))); + } +} diff --git a/smart-contract/contracts/src/error.rs b/smart-contract/contracts/src/error.rs index c4d422ae..6485f01a 100644 --- a/smart-contract/contracts/src/error.rs +++ b/smart-contract/contracts/src/error.rs @@ -135,4 +135,11 @@ pub enum Error { AuditTrailNotInitialized = 156, // Reserved for the linkable (LSAG) variant; see RING_SIGNATURE.md §5. KeyImageAlreadyUsed = 157, + + // --- Integrity Anchor / Decentralized Storage (160-169) --- + AnchorNotFound = 160, + AnchorHashMismatch = 161, + CidTooLong = 162, + FileTooLarge = 163, + InvalidStorageBackend = 164, } diff --git a/smart-contract/contracts/src/events.rs b/smart-contract/contracts/src/events.rs index 6fb62941..eb88cfc6 100644 --- a/smart-contract/contracts/src/events.rs +++ b/smart-contract/contracts/src/events.rs @@ -249,3 +249,25 @@ pub struct ChannelFinalized { pub batch_count: u64, pub state_root: BytesN<32>, } + +/// Emitted when decentralized content is anchored on-chain by content hash. +#[contractevent] +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct IntegrityAnchored { + pub content_hash: BytesN<32>, + pub cid: soroban_sdk::String, + pub backend: Symbol, + pub product_id: soroban_sdk::String, + pub byte_size: u64, + pub anchorer: Address, +} + +/// Emitted when off-chain verification detects content tampering. +#[contractevent] +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct IntegrityTamperFlagged { + pub content_hash: BytesN<32>, + pub cid: soroban_sdk::String, + pub product_id: soroban_sdk::String, + pub reporter: Address, +} diff --git a/smart-contract/contracts/src/integrity_anchor.rs b/smart-contract/contracts/src/integrity_anchor.rs new file mode 100644 index 00000000..2d75a209 --- /dev/null +++ b/smart-contract/contracts/src/integrity_anchor.rs @@ -0,0 +1,319 @@ +//! On-chain integrity anchors for decentralized content (manuals, PDFs, media). +//! +//! Content is stored directly on IPFS or Arweave — this contract only records the +//! content-addressed hash (SHA-256) and CID/transaction id so tamper detection can +//! compare fetched bytes against the anchor. Identical manuals deduplicate via CAS: +//! anchoring the same `content_hash` twice is idempotent. + +use soroban_sdk::{contract, contractimpl, Address, BytesN, Env, String, Symbol, Vec}; + +use crate::error::Error; +use crate::events::{IntegrityAnchored, IntegrityTamperFlagged}; +use crate::types::{ContentAnchor, DataKey}; +use crate::validation_contract::ValidationContract; +use crate::ChainLogisticsContractClient; + +/// Maximum anchored file size (50 MiB). Enforced off-chain as well; on-chain guard +/// prevents oversized metadata from being registered. +pub const MAX_ANCHOR_FILE_BYTES: u64 = 50 * 1024 * 1024; +pub const MAX_CID_LEN: u32 = 128; + +fn get_main_contract(env: &Env) -> Option
{ + env.storage().persistent().get(&DataKey::MainContract) +} + +fn require_init(env: &Env) -> Result<(), Error> { + if get_main_contract(env).is_none() { + return Err(Error::NotInitialized); + } + Ok(()) +} + +fn require_not_paused(env: &Env) -> Result<(), Error> { + let main_contract = get_main_contract(env).ok_or(Error::NotInitialized)?; + let main_client = ChainLogisticsContractClient::new(env, &main_contract); + if main_client.is_paused() { + return Err(Error::ContractPaused); + } + Ok(()) +} + +fn get_anchor(env: &Env, content_hash: &BytesN<32>) -> Option { + env.storage() + .persistent() + .get(&DataKey::ContentAnchor(content_hash.clone())) +} + +fn put_anchor(env: &Env, anchor: &ContentAnchor) { + env.storage() + .persistent() + .set(&DataKey::ContentAnchor(anchor.content_hash.clone()), anchor); +} + +fn append_product_anchor(env: &Env, product_id: &String, content_hash: &BytesN<32>) { + let mut hashes: Vec> = env + .storage() + .persistent() + .get(&DataKey::ProductContentAnchors(product_id.clone())) + .unwrap_or_else(|| Vec::new(env)); + + let mut found = false; + for i in 0..hashes.len() { + if hashes.get(i).unwrap() == *content_hash { + found = true; + break; + } + } + if !found { + hashes.push_back(content_hash.clone()); + env.storage() + .persistent() + .set(&DataKey::ProductContentAnchors(product_id.clone()), &hashes); + } +} + +fn validate_backend_env(env: &Env, backend: &Symbol) -> Result<(), Error> { + let ipfs = Symbol::new(env, "ipfs"); + let arweave = Symbol::new(env, "arweave"); + if backend != &ipfs && backend != &arweave { + return Err(Error::InvalidStorageBackend); + } + Ok(()) +} + +#[contract] +pub struct IntegrityAnchorContract; + +#[contractimpl] +impl IntegrityAnchorContract { + /// Wire this satellite contract to the main ChainLogistics contract. + pub fn init(env: Env, main_contract: Address) { + ValidationContract::validate_contract_address(&env, &main_contract) + .unwrap_or_else(|_| panic!("invalid main contract")); + env.storage() + .persistent() + .set(&DataKey::MainContract, &main_contract); + } + + /// Anchor a content hash and its decentralized storage CID. + /// + /// CAS deduplication: if `content_hash` is already anchored with the same CID, + /// this call succeeds without duplicating state. + pub fn anchor_content( + env: Env, + anchorer: Address, + content_hash: BytesN<32>, + cid: String, + backend: Symbol, + product_id: String, + byte_size: u64, + ) -> Result<(), Error> { + require_init(&env)?; + require_not_paused(&env)?; + anchorer.require_auth(); + + ValidationContract::non_empty(&cid)?; + ValidationContract::max_len(&cid, MAX_CID_LEN)?; + ValidationContract::non_empty(&product_id)?; + ValidationContract::max_len(&product_id, ValidationContract::MAX_PRODUCT_ID_LEN)?; + validate_backend_env(&env, &backend)?; + + if byte_size == 0 || byte_size > MAX_ANCHOR_FILE_BYTES { + return Err(Error::FileTooLarge); + } + + if let Some(existing) = get_anchor(&env, &content_hash) { + if existing.cid == cid && existing.backend == backend { + return Ok(()); + } + return Err(Error::AnchorHashMismatch); + } + + let anchor = ContentAnchor { + content_hash: content_hash.clone(), + cid: cid.clone(), + backend: backend.clone(), + product_id: product_id.clone(), + byte_size, + anchored_at: env.ledger().timestamp(), + anchored_by: anchorer.clone(), + tamper_detected: false, + }; + + put_anchor(&env, &anchor); + append_product_anchor(&env, &product_id, &content_hash); + + IntegrityAnchored { + content_hash, + cid, + backend, + product_id, + byte_size, + anchorer, + } + .publish(&env); + + Ok(()) + } + + /// Return the on-chain anchor for a content hash, if present. + pub fn get_anchor(env: Env, content_hash: BytesN<32>) -> Result { + require_init(&env)?; + get_anchor(&env, &content_hash).ok_or(Error::AnchorNotFound) + } + + /// Whether a content hash has been anchored (CAS lookup). + pub fn is_anchored(env: Env, content_hash: BytesN<32>) -> Result { + require_init(&env)?; + Ok(get_anchor(&env, &content_hash).is_some()) + } + + /// List content hashes anchored for a product. + pub fn get_product_anchors(env: Env, product_id: String) -> Result>, Error> { + require_init(&env)?; + ValidationContract::non_empty(&product_id)?; + Ok(env + .storage() + .persistent() + .get(&DataKey::ProductContentAnchors(product_id)) + .unwrap_or_else(|| Vec::new(&env))) + } + + /// Record that off-chain verification detected tampering. + pub fn flag_tamper( + env: Env, + reporter: Address, + content_hash: BytesN<32>, + ) -> Result<(), Error> { + require_init(&env)?; + reporter.require_auth(); + + let mut anchor = get_anchor(&env, &content_hash).ok_or(Error::AnchorNotFound)?; + if anchor.tamper_detected { + return Ok(()); + } + + anchor.tamper_detected = true; + put_anchor(&env, &anchor); + + IntegrityTamperFlagged { + content_hash: content_hash.clone(), + cid: anchor.cid.clone(), + product_id: anchor.product_id.clone(), + reporter, + } + .publish(&env); + + Ok(()) + } +} + +#[cfg(test)] +mod test_integrity_anchor { + use super::*; + use crate::{ChainLogisticsContract, ChainLogisticsContractClient}; + use soroban_sdk::{testutils::Address as _, Address, BytesN, Env, String, Symbol}; + + fn setup(env: &Env) -> (IntegrityAnchorContractClient<'static>, Address) { + let cl_id = env.register_contract(None, ChainLogisticsContract); + let ia_id = env.register_contract(None, IntegrityAnchorContract); + let cl_client = ChainLogisticsContractClient::new(env, &cl_id); + let ia_client = IntegrityAnchorContractClient::new(env, &ia_id); + + let admin = Address::generate(env); + let auth_contract = Address::generate(env); + cl_client.init(&admin, &auth_contract); + ia_client.init(&cl_id); + (ia_client, admin) + } + + fn sample_hash(env: &Env, byte: u8) -> BytesN<32> { + BytesN::from_array(env, &[byte; 32]) + } + + #[test] + fn anchor_and_cas_dedup() { + let env = Env::default(); + env.mock_all_auths(); + let (client, anchorer) = setup(&env); + + let hash = sample_hash(&env, 1); + let cid = String::from_str(&env, "QmTest123"); + let backend = Symbol::new(&env, "ipfs"); + let product_id = String::from_str(&env, "PROD-001"); + + client.anchor_content( + &anchorer, + &hash, + &cid, + &backend, + &product_id, + &1024, + ); + + assert!(client.is_anchored(&hash)); + let anchor = client.get_anchor(&hash); + assert_eq!(anchor.cid, cid); + assert_eq!(anchor.byte_size, 1024); + + // Idempotent re-anchor with same hash + CID + client.anchor_content( + &anchorer, + &hash, + &cid, + &backend, + &product_id, + &1024, + ); + + let anchors = client.get_product_anchors(&product_id); + assert_eq!(anchors.len(), 1); + } + + #[test] + fn rejects_oversized_file_metadata() { + let env = Env::default(); + env.mock_all_auths(); + let (client, anchorer) = setup(&env); + + let hash = sample_hash(&env, 2); + let cid = String::from_str(&env, "QmBig"); + let backend = Symbol::new(&env, "ipfs"); + let product_id = String::from_str(&env, "PROD-002"); + + let result = client.try_anchor_content( + &anchorer, + &hash, + &cid, + &backend, + &product_id, + &(MAX_ANCHOR_FILE_BYTES + 1), + ); + assert_eq!(result, Err(Ok(Error::FileTooLarge))); + } + + #[test] + fn flag_tamper_sets_state() { + let env = Env::default(); + env.mock_all_auths(); + let (client, anchorer) = setup(&env); + + let hash = sample_hash(&env, 3); + let cid = String::from_str(&env, "QmTamper"); + let backend = Symbol::new(&env, "arweave"); + let product_id = String::from_str(&env, "PROD-003"); + + client.anchor_content( + &anchorer, + &hash, + &cid, + &backend, + &product_id, + &4096, + ); + + client.flag_tamper(&anchorer, &hash); + let anchor = client.get_anchor(&hash); + assert!(anchor.tamper_detected); + } +} diff --git a/smart-contract/contracts/src/lib.rs b/smart-contract/contracts/src/lib.rs index a2151fe3..c6e99109 100644 --- a/smart-contract/contracts/src/lib.rs +++ b/smart-contract/contracts/src/lib.rs @@ -49,6 +49,8 @@ mod product_transfer; #[cfg(not(target_arch = "wasm32"))] mod ring_signature; #[cfg(not(target_arch = "wasm32"))] +mod integrity_anchor; +#[cfg(not(target_arch = "wasm32"))] mod state_channel; #[cfg(not(target_arch = "wasm32"))] mod stats; @@ -106,6 +108,8 @@ pub use product_transfer::*; #[cfg(not(target_arch = "wasm32"))] pub use ring_signature::*; #[cfg(not(target_arch = "wasm32"))] +pub use integrity_anchor::*; +#[cfg(not(target_arch = "wasm32"))] pub use state_channel::*; #[cfg(not(target_arch = "wasm32"))] pub use stats::*; diff --git a/smart-contract/contracts/src/types.rs b/smart-contract/contracts/src/types.rs index 430e3e1f..9277e6d7 100644 --- a/smart-contract/contracts/src/types.rs +++ b/smart-contract/contracts/src/types.rs @@ -183,6 +183,9 @@ pub enum DataKey { Channel(u64), // Channel state by channel id ChannelSeq, // Monotonic channel id counter ProductChannels(String), // Channel ids opened against a product + // ── Integrity Anchor (decentralized content) ───────────────────────── + ContentAnchor(BytesN<32>), // Anchor keyed by SHA-256 content hash + ProductContentAnchors(String), // Content hashes linked to a product } // ─── Event Types ─────────────────────────────────────────────────────────── @@ -354,6 +357,22 @@ pub struct SignedState { pub sig_b: BytesN<64>, } +/// On-chain anchor for content stored on IPFS or Arweave (manuals, PDFs, media). +/// Keyed by SHA-256 content hash for CAS deduplication. +#[contracttype] +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ContentAnchor { + pub content_hash: BytesN<32>, + pub cid: String, + /// Storage backend: `ipfs` or `arweave`. + pub backend: Symbol, + pub product_id: String, + pub byte_size: u64, + pub anchored_at: u64, + pub anchored_by: Address, + pub tamper_detected: bool, +} + /// On-chain record of an IoT tracking state channel. #[contracttype] #[derive(Clone, Debug, Eq, PartialEq)]