Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
-- Decentralized content integrity anchors (IPFS / Arweave CAS registry)
-- Files are stored directly on decentralized networks; this table tracks
-- anchors for periodic tamper verification (no central file silo).

CREATE TABLE IF NOT EXISTS content_anchors (
content_hash TEXT PRIMARY KEY,
cid TEXT NOT NULL,
storage_backend TEXT NOT NULL CHECK (storage_backend IN ('ipfs', 'arweave')),
product_id TEXT,
byte_size BIGINT NOT NULL CHECK (byte_size > 0 AND byte_size <= 52428800),
mime_type TEXT,
anchored_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
anchored_by TEXT,
last_verified_at TIMESTAMPTZ,
verification_status TEXT NOT NULL DEFAULT 'pending'
CHECK (verification_status IN ('pending', 'verified', 'tampered', 'unavailable')),
tamper_alert_sent BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_content_anchors_product_id ON content_anchors(product_id);
CREATE INDEX IF NOT EXISTS idx_content_anchors_verification
ON content_anchors(verification_status, last_verified_at NULLS FIRST);

CREATE TABLE IF NOT EXISTS content_tamper_alerts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
content_hash TEXT NOT NULL REFERENCES content_anchors(content_hash) ON DELETE CASCADE,
expected_hash TEXT NOT NULL,
actual_hash TEXT,
cid TEXT NOT NULL,
storage_backend TEXT NOT NULL,
product_id TEXT,
detected_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
alert_sent BOOLEAN NOT NULL DEFAULT FALSE
);

CREATE INDEX IF NOT EXISTS idx_content_tamper_alerts_detected_at
ON content_tamper_alerts(detected_at DESC);
1 change: 1 addition & 0 deletions backend/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ pub mod batch;
// pub mod supplier; // Temporarily disabled
pub mod predictive_routing;
pub mod physics_model;
pub mod storage;
205 changes: 205 additions & 0 deletions backend/src/handlers/storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use axum::{
extract::{Path, State},
http::StatusCode,
response::Json,
};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use crate::{
error::AppError,
services::storage_integrity_service::{RegisterAnchorRequest, MAX_FILE_BYTES},
AppState,
};

#[derive(Debug, Serialize, ToSchema)]
pub struct AnchorResponse {
pub content_hash: String,
pub cid: String,
pub storage_backend: String,
pub product_id: Option<String>,
pub byte_size: i64,
pub mime_type: Option<String>,
pub verification_status: String,
pub anchored_at: chrono::DateTime<chrono::Utc>,
}

#[derive(Debug, Serialize, ToSchema)]
pub struct ExistsResponse {
pub content_hash: String,
pub exists: bool,
}

#[derive(Debug, Deserialize, ToSchema)]
pub struct RegisterAnchorBody {
pub content_hash: String,
pub cid: String,
pub storage_backend: String,
#[serde(alias = "productId")]
pub product_id: Option<String>,
pub byte_size: u64,
pub mime_type: Option<String>,
#[serde(alias = "anchoredBy")]
pub anchored_by: Option<String>,
}

fn to_response(anchor: crate::services::storage_integrity_service::ContentAnchor) -> AnchorResponse {
AnchorResponse {
content_hash: anchor.content_hash,
cid: anchor.cid,
storage_backend: anchor.storage_backend,
product_id: anchor.product_id,
byte_size: anchor.byte_size,
mime_type: anchor.mime_type,
verification_status: anchor.verification_status,
anchored_at: anchor.anchored_at,
}
}

/// Register a content integrity anchor (CAS β€” duplicate hash + CID is idempotent).
#[utoipa::path(
post,
path = "/api/v1/storage/anchors",
request_body = RegisterAnchorBody,
responses(
(status = 201, description = "Anchor registered", body = AnchorResponse),
(status = 400, description = "Invalid request"),
(status = 409, description = "Hash mismatch")
),
tag = "storage"
)]
pub async fn register_anchor(
State(state): State<AppState>,
Json(body): Json<RegisterAnchorBody>,
) -> Result<(StatusCode, Json<AnchorResponse>), AppError> {
if body.byte_size == 0 || body.byte_size > MAX_FILE_BYTES {
return Err(AppError::BadRequest(format!(
"byte_size must be between 1 and {}",
MAX_FILE_BYTES
)));
}

if body.content_hash.len() != 64 {
return Err(AppError::BadRequest(
"content_hash must be a 64-character hex SHA-256 digest".into(),
));
}

let req = RegisterAnchorRequest {
content_hash: body.content_hash.to_lowercase(),
cid: body.cid,
storage_backend: body.storage_backend,
product_id: body.product_id,
byte_size: body.byte_size,
mime_type: body.mime_type,
anchored_by: body.anchored_by,
};

let anchor = state
.storage_integrity_service
.register_anchor(&req)
.await
.map_err(|e| {
if e.to_string().contains("different CID") {
AppError::AlreadyExists(e.to_string())
} else if e.to_string().contains("must be") {
AppError::BadRequest(e.to_string())
} else {
AppError::Database(e)
}
})?;

Ok((StatusCode::CREATED, Json(to_response(anchor))))
}

/// CAS existence check for content-hash deduplication before upload.
#[utoipa::path(
get,
path = "/api/v1/storage/exists/{content_hash}",
params(("content_hash" = String, Path, description = "SHA-256 hex digest")),
responses((status = 200, description = "Existence result", body = ExistsResponse)),
tag = "storage"
)]
pub async fn check_exists(
State(state): State<AppState>,
Path(content_hash): Path<String>,
) -> Result<Json<ExistsResponse>, AppError> {
let exists = state
.storage_integrity_service
.exists(&content_hash.to_lowercase())
.await
.map_err(AppError::Database)?;

Ok(Json(ExistsResponse {
content_hash,
exists,
}))
}

/// Get anchor metadata by content hash.
#[utoipa::path(
get,
path = "/api/v1/storage/anchors/{content_hash}",
params(("content_hash" = String, Path, description = "SHA-256 hex digest")),
responses(
(status = 200, description = "Anchor found", body = AnchorResponse),
(status = 404, description = "Not found")
),
tag = "storage"
)]
pub async fn get_anchor(
State(state): State<AppState>,
Path(content_hash): Path<String>,
) -> Result<Json<AnchorResponse>, AppError> {
let anchor = state
.storage_integrity_service
.get_anchor(&content_hash.to_lowercase())
.await
.map_err(AppError::Database)?
.ok_or_else(|| AppError::NotFound("Anchor not found".into()))?;

Ok(Json(to_response(anchor)))
}

/// List anchors for a product.
#[utoipa::path(
get,
path = "/api/v1/storage/products/{product_id}/anchors",
params(("product_id" = String, Path, description = "Product identifier")),
responses((status = 200, description = "Product anchors", body = [AnchorResponse])),
tag = "storage"
)]
pub async fn list_product_anchors(
State(state): State<AppState>,
Path(product_id): Path<String>,
) -> Result<Json<Vec<AnchorResponse>>, AppError> {
let anchors = state
.storage_integrity_service
.list_product_anchors(&product_id)
.await
.map_err(AppError::Database)?;

Ok(Json(anchors.into_iter().map(to_response).collect()))
}

/// Trigger on-demand verification (admin/cron hook).
#[utoipa::path(
post,
path = "/api/v1/storage/verify",
responses((status = 200, description = "Verification batch completed")),
tag = "storage"
)]
pub async fn trigger_verification(
State(state): State<AppState>,
) -> Result<Json<serde_json::Value>, AppError> {
let tampered = state
.storage_integrity_service
.verify_pending_anchors()
.await
.map_err(AppError::Database)?;

Ok(Json(serde_json::json!({
"status": "completed",
"tampered_count": tampered
})))
}
10 changes: 9 additions & 1 deletion backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ use services::{
CollaborationService, EventService, FinancialService, PredictiveRoutingService,
ProductService, RecallService, SyncService, UserService,
CollaborationService, EventService, FinancialService, IoTService, PhysicsModelService, PredictiveRoutingService,
ProductService, QualityService, RecallService, RegulatoryService, SupplierService,
ProductService, QualityService, RecallService, RegulatoryService, StorageConfig,
StorageIntegrityService, SupplierService,
SyncService, UserService, MercuryIndexer, MercuryConfig, RuleEngine, SagaManager,
RedisWorkerPool, WorkerConfig, TrackingEventProcessor, get_default_rules,
get_product_registration_saga, NoopAction, EventProcessingHandler, RuleEvaluationHandler,
Expand Down Expand Up @@ -82,6 +83,7 @@ pub struct AppState {
pub rule_engine: Arc<RuleEngine>,
pub saga_manager: Arc<SagaManager>,
pub worker_pool: Arc<RedisWorkerPool>,
pub storage_integrity_service: Arc<StorageIntegrityService>,
}

impl AppState {
Expand Down Expand Up @@ -120,6 +122,11 @@ impl AppState {
let predictive_routing_service =
Arc::new(PredictiveRoutingService::new(db.pool().clone()));
let physics_model_service = Arc::new(PhysicsModelService::new(db.pool().clone()));
let storage_integrity_service = Arc::new(StorageIntegrityService::new(
db.pool().clone(),
redis_client.clone(),
StorageConfig::default(),
));

// Initialize Mercury streaming indexer
let mercury_config = MercuryConfig::default();
Expand Down Expand Up @@ -193,6 +200,7 @@ impl AppState {
rule_engine,
saga_manager,
worker_pool,
storage_integrity_service,
})
}
}
Expand Down
31 changes: 31 additions & 0 deletions backend/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub fn api_routes() -> Router<AppState> {
.nest("/api/v1/monitoring", monitoring_routes())
.nest("/api/v1/collaboration", collaboration_routes())
.nest("/api/v1/routing", routing_routes())
.nest("/api/v1/storage", storage_routes())
// .nest("/api/v1/iot", iot_routes()) // Temporarily disabled
// .nest("/api/v1/quality", quality_routes()) // Temporarily disabled
// .nest("/api/v1/regulatory", regulatory_routes()) // Temporarily disabled
Expand Down Expand Up @@ -380,3 +381,33 @@ fn physics_routes() -> Router<AppState> {
crate::middleware::rate_limit::rate_limit_middleware,
))
}

fn storage_routes() -> Router<AppState> {
Router::new()
.route(
"/anchors",
post(crate::handlers::storage::register_anchor),
)
.route(
"/anchors/:content_hash",
get(crate::handlers::storage::get_anchor),
)
.route(
"/exists/:content_hash",
get(crate::handlers::storage::check_exists),
)
.route(
"/products/:product_id/anchors",
get(crate::handlers::storage::list_product_anchors),
)
.route(
"/verify",
post(crate::handlers::storage::trigger_verification).layer(middleware::from_fn(
require_role(vec![UserRole::Administrator, UserRole::Auditor]),
)),
)
.layer(middleware::from_fn(api_key_auth))
.layer(middleware::from_fn(
crate::middleware::rate_limit::rate_limit_middleware,
))
}
6 changes: 6 additions & 0 deletions backend/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,9 @@ pub use physics_model_service::PhysicsModelService;

pub mod iot_twin_sync_service;
pub use iot_twin_sync_service::IoTTwinSyncService;

pub mod storage_integrity_service;
pub use storage_integrity_service::{
ContentAnchor, RegisterAnchorRequest, StorageBackend, StorageConfig,
StorageIntegrityService, TamperAlert, VerificationStatus, MAX_FILE_BYTES,
};
Loading
Loading