From b9ee28d9602bbbc1b63dab0ac9796cb14f2ab7eb Mon Sep 17 00:00:00 2001 From: Mayckon Giovani Date: Sat, 11 Apr 2026 12:53:47 -0400 Subject: [PATCH] security: harden anti-rollback floors for fleet policy and revocations --- docs/coap.md | 243 ++++++++----------------------- docs/mqtt.md | 36 ++++- src/lib.rs | 4 + src/mqtt_control_plane.rs | 264 ++++++++++++++++++++++++++++++++++ src/mqtt_secure.rs | 275 +++++++++++++++++++++++++++++++++--- src/security/provider.rs | 51 +++++++ src/security/time.rs | 15 +- tests/control_plane_sync.rs | 84 +++++++++++ tests/mqtt_invariants.rs | 124 ++++++++++++++++ 9 files changed, 881 insertions(+), 215 deletions(-) create mode 100644 src/mqtt_control_plane.rs create mode 100644 tests/control_plane_sync.rs diff --git a/docs/coap.md b/docs/coap.md index 191ef83..de631a6 100644 --- a/docs/coap.md +++ b/docs/coap.md @@ -1,223 +1,90 @@ -# Secure CoAP Client +# CoAP Security (What Exists vs. What Is “Industrial”) -This document provides detailed information about the secure CoAP client implementation in PQC-IIoT. +This crate currently exposes **two CoAP security modes** under `pqc_iiot::coap_secure` when `coap-std` is enabled: -## Table of Contents +1. `SecureCoapClient`: **signed payloads** (authenticity only). +2. `SecureCoapSessionClient` / `SecureCoapSessionServer`: a **custom session + symmetric ratchet** that provides confidentiality + integrity + replay protection at the application layer. -- [Overview](#overview) -- [Usage](#usage) -- [Security Features](#security-features) -- [Error Handling](#error-handling) -- [Performance](#performance) -- [Examples](#examples) +Neither mode is a standards-compliant replacement for **OSCORE (RFC 8613)** or **DTLS**. For critical IIoT deployments where interoperability and compliance matter, OSCORE/DTLS is still the correct transport/security boundary. -## Overview +## Threat Model (Practical) -The `SecureCoapClient` provides a secure CoAP client implementation with post-quantum cryptography. It uses Kyber for key encapsulation and Falcon for message signing, making it suitable for resource-constrained IIoT devices. +- The network is adversarial: MITM, replay, reordering, injection. +- UDP transport provides no integrity/confidentiality by itself. +- You must assume loss, duplication, and reordering. +- Identity must be explicit (pinned keys or provisioning-backed certs); TOFU is not a “critical” baseline. -## Usage +## Mode A: Signed Payloads (`SecureCoapClient`) -### Basic Setup +This mode signs the application payload and appends a detached Falcon signature: -```rust -use pqc_iiot::coap_secure::SecureCoapClient; -use std::net::SocketAddr; - -fn main() -> Result<(), Box> { - // Create secure CoAP client - let client = SecureCoapClient::new()?; - - // Server address - let server_addr = "127.0.0.1:5683".parse::()?; - - Ok(()) -} ``` - -### Sending Requests - -```rust -// Send a secure GET request -let path = "sensors/temperature"; -let response = client.get(server_addr, path)?; - -// Send a secure POST request with payload -let payload = b"25.5"; -let response = client.post(server_addr, path, payload)?; -``` - -### Resource Discovery - -```rust -// Discover resources -let resources = client.discover(server_addr)?; -for resource in resources { - println!("Discovered resource: {}", resource); -} -``` - -## Security Features - -### Message Protection - -- **Encryption**: Messages are encrypted using Kyber -- **Signing**: Messages are signed using Falcon -- **Replay Protection**: Message IDs and timestamps -- **Path Validation**: Secure resource paths - -### Key Management - -- Automatic key rotation -- Secure key storage -- Session key establishment - -## Error Handling - -The client uses a custom error type for CoAP operations: - -```rust -pub enum Error { - /// Network error - NetworkError(String), - /// Request error - RequestError(String), - /// Response error - ResponseError(String), - /// Security error - SecurityError(String), -} +[message][signature][sig_len_be_u16] ``` -## Performance - -### Message Overhead - -| Component | Size (bytes) | -|-----------|--------------| -| Header | 16 | -| Signature | 64 | -| Ciphertext| 128 | - -### Processing Time - -| Operation | Time (ms) | -|-----------|-----------| -| GET | 1.5 | -| POST | 1.8 | -| Discovery | 2.1 | - -## Best Practices +### Properties -1. **Connection Management** - - Use DTLS for transport security - - Implement retry logic - - Monitor connection health +- Provides **end-to-end authenticity** *if* the peer’s Falcon public key is pinned. +- Does **not** provide confidentiality. +- Does **not** provide replay protection (beyond whatever the application does at higher layers). +- Does **not** protect CoAP headers/options (only the payload). -2. **Resource Management** - - Cache discovered resources - - Implement observation patterns - - Handle resource updates - -3. **Security** - - Rotate keys regularly - - Validate resource paths - - Monitor for anomalies - -## Examples - -### Complete Example +### Usage Sketch ```rust use pqc_iiot::coap_secure::SecureCoapClient; use std::net::SocketAddr; -fn main() -> Result<(), Box> { - // Create client - let client = SecureCoapClient::new()?; - - // Server address - let server_addr = "127.0.0.1:5683".parse::()?; - - // Discover resources - let resources = client.discover(server_addr)?; - println!("Discovered resources: {:?}", resources); - - // Send temperature reading - let path = "sensors/temperature"; - let payload = b"25.5"; - let response = client.post(server_addr, path, payload)?; - println!("Response: {:?}", response); - - Ok(()) -} -``` - -## Integration +let server: SocketAddr = "127.0.0.1:5683".parse().unwrap(); -### With IIoT Systems +// Client generates its own signing keys on creation. +let mut client = SecureCoapClient::new().unwrap() + .with_peer_sig_pk(/* pinned server Falcon pk */ vec![]); -The secure CoAP client is designed to integrate with IIoT systems: - -1. **Device Management** - - Secure device registration - - Firmware updates - - Configuration management - -2. **Data Collection** - - Secure sensor data - - Resource observation - - Historical data access +let resp = client.get(server, "sensors/temp").unwrap(); +let plaintext = client.verify_response(&resp).unwrap(); +``` -3. **Command and Control** - - Secure command execution - - Status monitoring - - Error reporting +## Mode B: Custom Secure Sessions (`SecureCoapSessionClient`) -## Troubleshooting +This mode implements an authenticated session handshake over a fixed CoAP path and then encrypts subsequent payloads using: -Common issues and solutions: +- Ephemeral Kyber KEM + ephemeral X25519 to derive initial chain keys. +- A symmetric ratchet (HKDF) to evolve message keys. +- AES-256-GCM for AEAD encryption. +- A skipped-key window to tolerate bounded out-of-order delivery. +- AAD binds the ciphertext to `(sender_id, receiver_id, code, path, token, session_id, msg_num)`. -1. **Connection Issues** - - Check network connectivity - - Verify server configuration - - Review security settings +### Properties -2. **Performance Issues** - - Monitor request rates - - Check resource usage - - Optimize payload size +- Provides **confidentiality + integrity** of payloads after session establishment. +- Provides **anti-replay** and bounded out-of-order tolerance. +- Is not interoperable: **not OSCORE/DTLS** (no standards-based security context, no COSE/OSCORE option, no DTLS record layer). -3. **Security Issues** - - Verify key rotation - - Check signature validation - - Monitor for attacks +### Usage Sketch -## Advanced Features +```rust +use pqc_iiot::coap_secure::SecureCoapSessionClient; +use std::net::SocketAddr; -### Resource Observation +let server: SocketAddr = "127.0.0.1:5683".parse().unwrap(); -```rust -// Observe a resource -let observer = client.observe(server_addr, path)?; +// peer_sig_pk must be pinned (server Falcon pk). +let mut client = SecureCoapSessionClient::new("device-1", "gw-1", /* peer_sig_pk */ vec![]) + .unwrap(); -// Handle updates -while let Some(update) = observer.next_update() { - println!("Resource updated: {:?}", update); -} +client.connect(server).unwrap(); +let resp = client.get(server, "test/resource").unwrap(); +assert!(!resp.message.payload.is_empty()); ``` -### Block-wise Transfers +## “Industrial” Path (OSCORE/DTLS) -```rust -// Send large payload in blocks -let large_payload = vec![0u8; 1024]; -let response = client.post_blockwise(server_addr, path, &large_payload)?; -``` +If the release target is **critical IIoT**, the correct endpoint is not “custom crypto in a CoAP module”; it is a **standards-defined transport/security context** with explicit compliance story: -### Multicast Support +- **OSCORE** for CoAP over UDP, typically with **EDHOC** for key establishment. +- **DTLS** for securing UDP transport when OSCORE is not viable. -```rust -// Send multicast request -let multicast_addr = "224.0.1.187:5683".parse::()?; -let responses = client.multicast_get(multicast_addr, path)?; -``` \ No newline at end of file +This crate currently treats OSCORE/DTLS as out-of-scope for the `coap-std` implementation and documents the custom session mode as a practical building block, not an interoperability baseline. + +If you want this repository to be “market standard”, the next concrete engineering step is to add an OSCORE/DTLS backend (feature-gated) and make the custom session mode explicitly “experimental / internal”. diff --git a/docs/mqtt.md b/docs/mqtt.md index b2e2cd9..be40699 100644 --- a/docs/mqtt.md +++ b/docs/mqtt.md @@ -116,7 +116,40 @@ Local revocation is tracked in the keystore and enforced during key exchange. Th - `KeyStore::revoke_key_id(peer_id, key_id)` -This is intentionally local-policy-driven: revocation distribution is an operational problem (out-of-band channel, broker ACLs, or a control plane). +Revocation distribution is an operational problem. The crate implements a minimal, broker-based control plane: + +- Signed revocation updates are published (retained) on `pqc/revocations/v1`. +- Clients can publish best-effort sync requests on `pqc/revocations/sync/v1` to trigger a re-publish by a gateway/CA service. + +The responder side is intentionally minimal and lives in `pqc_iiot::mqtt_control_plane::MqttControlPlane`. + +### Fleet Policy (Signed, Monotonic, Partition-Aware) + +Fleet security policy is a CA-signed update stream (not broker-trusted configuration): + +- Updates are published (retained) on `pqc/policy/v1` as `FleetPolicyUpdate`. +- Clients can publish sync requests on `pqc/policy/sync/v1`. + +Policy is treated as an explicit **security gate**: + +- `require_sessions`: disallows v1 per-message hybrid encryption and requires v2 sessions (ratchet). +- `min_revocation_seq`: fail-closed until emergency revocations are caught up. +- `ttl_secs`: when secure time is available, new handshakes and encrypted sends fail-closed once the policy becomes stale. +- `require_rollback_resistant_storage`: fail-closed unless the provider backend is rollback resistant. + +### Anti-Rollback Floors (Sealed Monotonic Counters) + +Critical fleets must assume filesystem compromise and rollback attempts. To model this explicitly, the client persists: + +- A **secure time floor** (unix seconds) under `pqc-iiot:time-floor:v1:`. +- A **fleet policy sequence floor** under `pqc-iiot:fleet-policy-seq:v1:`. +- A **revocation sequence floor** under `pqc-iiot:revocation-seq:v1:`. +- A **keystore generation** bound to a sealed monotonic counter and a sealed file digest. + +Semantics: + +- If a sealed floor indicates a higher `seq` than the locally loaded policy/revocation state, the client **fails closed** on security-sensitive operations and requests a control-plane sync. +- Rollback resistance is only as strong as the provider backend. For software-only providers, these are best-effort signals; for TPM/HSM/TEE-backed providers, they become enforceable invariants. ## Remote Attestation (Optional, Verifier-Driven) @@ -136,4 +169,3 @@ Verification rule (current simplified policy): - `ak_public_key` must match the peer's certified `sig_pk` (software-provider simplification; production should use a distinct AK certified by TPM/TEE). Only after this does the verifier set `peer.is_trusted = true` and `is_peer_ready(peer) == true`. - diff --git a/src/lib.rs b/src/lib.rs index e2a6f53..b66ac87 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,6 +82,10 @@ pub mod crypto { #[cfg(feature = "mqtt")] pub mod mqtt_secure; +/// MQTT control-plane helpers (policy/revocation sync responder). +#[cfg(feature = "mqtt")] +pub mod mqtt_control_plane; + /// Secure CoAP client implementation #[cfg(feature = "coap")] pub mod coap_secure; diff --git a/src/mqtt_control_plane.rs b/src/mqtt_control_plane.rs new file mode 100644 index 0000000..a64cea4 --- /dev/null +++ b/src/mqtt_control_plane.rs @@ -0,0 +1,264 @@ +//! Minimal MQTT control-plane responder (policy + revocation sync). +//! +//! This module is a practical building block for fleet operations: +//! - clients publish sync requests to `pqc/policy/sync/v1` and `pqc/revocations/sync/v1` +//! - a gateway/CA service responds by re-publishing the latest signed updates as retained messages +//! on `pqc/policy/v1` and `pqc/revocations/v1` +//! +//! Security model: +//! - the broker/network is untrusted; updates must be signed by the CA and verified by clients +//! - sync requests are unauthenticated hints; treat them as best-effort, rate-limited triggers + +use crate::security::policy::FleetPolicyUpdate; +use crate::security::revocation::RevocationUpdate; +use crate::{Error, Result}; +use log::{debug, info, warn}; +use rumqttc::{Client, Event, MqttOptions, Packet, Publish, QoS}; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +const DEFAULT_POLICY_TOPIC: &str = "pqc/policy/v1"; +const DEFAULT_POLICY_SYNC_TOPIC: &str = "pqc/policy/sync/v1"; +const DEFAULT_REVOCATION_TOPIC: &str = "pqc/revocations/v1"; +const DEFAULT_REVOCATION_SYNC_TOPIC: &str = "pqc/revocations/sync/v1"; + +/// Hard DoS containment limit for sync request payloads. +const MAX_SYNC_REQUEST_BYTES: usize = 8 * 1024; // 8 KiB + +#[derive(Debug, Serialize, Deserialize)] +struct FleetPolicySyncRequest { + version: u8, + client_id: String, + current_seq: u64, +} + +impl FleetPolicySyncRequest { + const VERSION_V1: u8 = 1; +} + +#[derive(Debug, Serialize, Deserialize)] +struct RevocationSyncRequest { + version: u8, + client_id: String, + current_seq: u64, +} + +impl RevocationSyncRequest { + const VERSION_V1: u8 = 1; +} + +/// Minimal control-plane service responding to sync requests. +/// +/// This is not a full CA implementation; it only re-publishes signed policy/revocation updates +/// that are already produced by your operational tooling. +pub struct MqttControlPlane { + options: MqttOptions, + policy_topic: String, + policy_sync_topic: String, + revocation_topic: String, + revocation_sync_topic: String, + policy: Option, + revocation: Option, +} + +impl MqttControlPlane { + /// Create a new control-plane responder for a given MQTT broker endpoint. + pub fn new(broker: &str, port: u16, client_id: &str) -> Self { + let mut options = MqttOptions::new(client_id, broker, port); + options.set_clean_session(true); + options.set_keep_alive(Duration::from_secs(15)); + + Self { + options, + policy_topic: DEFAULT_POLICY_TOPIC.to_string(), + policy_sync_topic: DEFAULT_POLICY_SYNC_TOPIC.to_string(), + revocation_topic: DEFAULT_REVOCATION_TOPIC.to_string(), + revocation_sync_topic: DEFAULT_REVOCATION_SYNC_TOPIC.to_string(), + policy: None, + revocation: None, + } + } + + /// Override the fleet policy update topic. + pub fn with_policy_topic(mut self, topic: &str) -> Self { + self.policy_topic = topic.to_string(); + self + } + + /// Override the fleet policy sync request topic. + pub fn with_policy_sync_topic(mut self, topic: &str) -> Self { + self.policy_sync_topic = topic.to_string(); + self + } + + /// Override the revocation update topic. + pub fn with_revocation_topic(mut self, topic: &str) -> Self { + self.revocation_topic = topic.to_string(); + self + } + + /// Override the revocation sync request topic. + pub fn with_revocation_sync_topic(mut self, topic: &str) -> Self { + self.revocation_sync_topic = topic.to_string(); + self + } + + /// Install the latest signed fleet policy update to be served. + pub fn set_policy_update(&mut self, update: FleetPolicyUpdate) { + self.policy = Some(update); + } + + /// Install the latest signed revocation update to be served. + pub fn set_revocation_update(&mut self, update: RevocationUpdate) { + self.revocation = Some(update); + } + + /// Publish the currently configured updates as retained messages (best-effort). + pub fn publish_retained(&self) -> Result<()> { + let (mut client, mut conn) = Client::new(self.options.clone(), 20); + + // Drain connection progress in a background thread. + let handle = std::thread::spawn(move || { + for notification in conn.iter() { + if notification.is_err() { + break; + } + } + }); + + if let Some(policy) = &self.policy { + let payload = serde_json::to_vec(policy) + .map_err(|e| Error::ClientError(format!("FleetPolicyUpdate JSON error: {}", e)))?; + client + .publish(&self.policy_topic, QoS::AtLeastOnce, true, payload) + .map_err(|e| Error::MqttError(e.to_string()))?; + } + if let Some(rev) = &self.revocation { + let payload = serde_json::to_vec(rev) + .map_err(|e| Error::ClientError(format!("RevocationUpdate JSON error: {}", e)))?; + client + .publish(&self.revocation_topic, QoS::AtLeastOnce, true, payload) + .map_err(|e| Error::MqttError(e.to_string()))?; + } + + let _ = client.disconnect(); + let _ = handle.join(); + Ok(()) + } + + /// Run the sync responder loop (blocking). + /// + /// Intended usage: run this in a dedicated gateway/CA process/thread. + pub fn run(mut self) -> Result<()> { + let (mut client, mut conn) = Client::new(self.options.clone(), 50); + client + .subscribe(self.policy_sync_topic.as_str(), QoS::AtLeastOnce) + .map_err(|e| Error::MqttError(e.to_string()))?; + client + .subscribe(self.revocation_sync_topic.as_str(), QoS::AtLeastOnce) + .map_err(|e| Error::MqttError(e.to_string()))?; + + info!( + "control-plane responder online: policy_topic={} revocation_topic={} policy_sync_topic={} revocation_sync_topic={}", + self.policy_topic, self.revocation_topic, self.policy_sync_topic, self.revocation_sync_topic + ); + + for notification in conn.iter() { + let event = match notification { + Ok(e) => e, + Err(e) => { + warn!("control-plane mqtt connection error: {}", e); + continue; + } + }; + if let Event::Incoming(Packet::Publish(publish)) = event { + let _ = self.handle_publish(&mut client, publish); + } + } + + Ok(()) + } + + fn handle_publish(&mut self, client: &mut Client, publish: Publish) -> Result<()> { + let topic = publish.topic.as_str(); + if publish.payload.len() > MAX_SYNC_REQUEST_BYTES { + warn!( + "dropping sync request: payload too large ({} bytes > {}) topic={}", + publish.payload.len(), + MAX_SYNC_REQUEST_BYTES, + topic + ); + return Ok(()); + } + + if topic == self.policy_sync_topic { + let req: FleetPolicySyncRequest = match serde_json::from_slice(&publish.payload) { + Ok(v) => v, + Err(e) => { + warn!("invalid FleetPolicySyncRequest JSON: {}", e); + return Ok(()); + } + }; + if req.version != FleetPolicySyncRequest::VERSION_V1 { + warn!( + "ignoring FleetPolicySyncRequest: unsupported version={} client_id={}", + req.version, req.client_id + ); + return Ok(()); + } + let latest = match &self.policy { + Some(p) => p, + None => return Ok(()), + }; + if req.current_seq < latest.seq { + debug!( + "serving fleet policy to client_id={} current_seq={} latest_seq={}", + req.client_id, req.current_seq, latest.seq + ); + let payload = serde_json::to_vec(latest).map_err(|e| { + Error::ClientError(format!("FleetPolicyUpdate JSON error: {}", e)) + })?; + client + .publish(&self.policy_topic, QoS::AtLeastOnce, true, payload) + .map_err(|e| Error::MqttError(e.to_string()))?; + } + return Ok(()); + } + + if topic == self.revocation_sync_topic { + let req: RevocationSyncRequest = match serde_json::from_slice(&publish.payload) { + Ok(v) => v, + Err(e) => { + warn!("invalid RevocationSyncRequest JSON: {}", e); + return Ok(()); + } + }; + if req.version != RevocationSyncRequest::VERSION_V1 { + warn!( + "ignoring RevocationSyncRequest: unsupported version={} client_id={}", + req.version, req.client_id + ); + return Ok(()); + } + let latest = match &self.revocation { + Some(r) => r, + None => return Ok(()), + }; + if req.current_seq < latest.seq { + debug!( + "serving revocation update to client_id={} current_seq={} latest_seq={}", + req.client_id, req.current_seq, latest.seq + ); + let payload = serde_json::to_vec(latest).map_err(|e| { + Error::ClientError(format!("RevocationUpdate JSON error: {}", e)) + })?; + client + .publish(&self.revocation_topic, QoS::AtLeastOnce, true, payload) + .map_err(|e| Error::MqttError(e.to_string()))?; + } + return Ok(()); + } + + Ok(()) + } +} diff --git a/src/mqtt_secure.rs b/src/mqtt_secure.rs index 7afed4b..3d734b6 100644 --- a/src/mqtt_secure.rs +++ b/src/mqtt_secure.rs @@ -6,7 +6,6 @@ use crate::security::audit::{AuditLog, AuditLogger, ChainedAuditLogger, Security use crate::security::hybrid; use crate::security::keystore::{KeyStore, PeerKeys}; use crate::security::metrics::SecurityMetrics; -use crate::security::monotonic::{seal_u64, unseal_u64}; use crate::security::policy::FleetPolicyUpdate; use crate::security::provider::{SecurityProvider, SoftwareSecurityProvider}; use crate::security::revocation::RevocationUpdate; @@ -112,6 +111,12 @@ const DEFAULT_GLOBAL_DECRYPT_BUDGET_REFILL_PER_SEC: u32 = 40; const DEFAULT_BUDGET_MAX_PEERS: usize = 10_000; +/// Maximum bytes read from a keystore file when computing integrity digests. +const MAX_KEYSTORE_FILE_BYTES: usize = 8 * 1024 * 1024; // 8 MiB + +const KEYSTORE_META_VERSION_V1: u8 = 1; +const KEYSTORE_META_BYTES_V1: usize = 1 + 8 + 32; + /// Fixed-rate token bucket. #[derive(Debug, Clone)] struct TokenBucket { @@ -218,6 +223,35 @@ fn storage_id_for(client_id: &str) -> String { } } +fn encode_keystore_meta_v1(generation: u64, hash: [u8; 32]) -> Vec { + let mut out = Vec::with_capacity(KEYSTORE_META_BYTES_V1); + out.push(KEYSTORE_META_VERSION_V1); + out.extend_from_slice(&generation.to_be_bytes()); + out.extend_from_slice(&hash); + out +} + +fn decode_keystore_meta_v1(blob: &[u8]) -> Result<(u64, [u8; 32])> { + if blob.len() != KEYSTORE_META_BYTES_V1 { + return Err(Error::CryptoError(format!( + "Invalid keystore meta length: {}", + blob.len() + ))); + } + if blob[0] != KEYSTORE_META_VERSION_V1 { + return Err(Error::ProtocolError(format!( + "Unsupported keystore meta version: {}", + blob[0] + ))); + } + let mut gen_bytes = [0u8; 8]; + gen_bytes.copy_from_slice(&blob[1..9]); + let generation = u64::from_be_bytes(gen_bytes); + let mut hash = [0u8; 32]; + hash.copy_from_slice(&blob[9..]); + Ok((generation, hash)) +} + /// Secure MQTT client using post-quantum cryptography pub struct SecureMqttClient { options: MqttOptions, @@ -287,6 +321,18 @@ pub struct SecureMqttClient { policy_topic: String, policy_sync_topic: String, fleet_policy: Option, + /// Sealed monotonic floor for the fleet policy sequence. + /// + /// This is meaningful only when the `SecurityProvider` is backed by rollback-resistant + /// storage (TPM NV / HSM / TEE monotonic storage / WORM remote append-only). + /// + /// Used to detect policy rollback across restarts and fail closed under partitions. + fleet_policy_seq_floor: u64, + /// Sealed monotonic floor for the revocation sequence (CRL/denylist updates). + /// + /// This provides rollback resistance for emergency revocations under the same assumptions as + /// `fleet_policy_seq_floor`. + revocation_seq_floor: u64, // Asymmetric-cost DoS budgets (token buckets). sig_verify_budget: TokenBucketMap, @@ -1314,6 +1360,8 @@ impl SecureMqttClient { policy_topic: DEFAULT_POLICY_TOPIC.to_string(), policy_sync_topic: DEFAULT_POLICY_SYNC_TOPIC.to_string(), fleet_policy: None, + fleet_policy_seq_floor: 0, + revocation_seq_floor: 0, sig_verify_budget: TokenBucketMap::new( DEFAULT_SIGVERIFY_BUDGET_CAPACITY, DEFAULT_SIGVERIFY_BUDGET_REFILL_PER_SEC, @@ -1356,13 +1404,29 @@ impl SecureMqttClient { // Initialize keystore anti-rollback binding after any legacy migration flush. client.init_keystore_anti_rollback()?; + client.load_revocation_seq_floor()?; + client.load_fleet_policy_seq_floor()?; if let Some(policy) = client.load_sealed_fleet_policy()? { if let Some(ca_pk) = client.trust_anchor_ca_sig_pk.clone() { if let Err(e) = policy.verify(&ca_pk, &client.policy_topic) { warn!("Ignoring sealed fleet policy: {}", e); } else { - client.apply_fleet_policy(policy); + if policy.seq < client.fleet_policy_seq_floor { + warn!( + "Ignoring sealed fleet policy: seq rollback detected (policy_seq={} < sealed_floor={})", + policy.seq, client.fleet_policy_seq_floor + ); + } else { + // Crash window repair: policy was sealed but the monotonic floor wasn't advanced. + let label = client.fleet_policy_seq_label(); + let _ = client + .provider + .sealed_monotonic_u64_advance_to(&label, policy.seq)?; + client.fleet_policy_seq_floor = + client.fleet_policy_seq_floor.max(policy.seq); + client.apply_fleet_policy(policy); + } } } } @@ -1469,6 +1533,8 @@ impl SecureMqttClient { policy_topic: DEFAULT_POLICY_TOPIC.to_string(), policy_sync_topic: DEFAULT_POLICY_SYNC_TOPIC.to_string(), fleet_policy: None, + fleet_policy_seq_floor: 0, + revocation_seq_floor: 0, sig_verify_budget: TokenBucketMap::new( DEFAULT_SIGVERIFY_BUDGET_CAPACITY, DEFAULT_SIGVERIFY_BUDGET_REFILL_PER_SEC, @@ -1508,13 +1574,28 @@ impl SecureMqttClient { // Anchor keystore anti-rollback counter. client.init_keystore_anti_rollback()?; + client.load_revocation_seq_floor()?; + client.load_fleet_policy_seq_floor()?; if let Some(policy) = client.load_sealed_fleet_policy()? { if let Some(ca_pk) = client.trust_anchor_ca_sig_pk.clone() { if let Err(e) = policy.verify(&ca_pk, &client.policy_topic) { warn!("Ignoring sealed fleet policy: {}", e); } else { - client.apply_fleet_policy(policy); + if policy.seq < client.fleet_policy_seq_floor { + warn!( + "Ignoring sealed fleet policy: seq rollback detected (policy_seq={} < sealed_floor={})", + policy.seq, client.fleet_policy_seq_floor + ); + } else { + let label = client.fleet_policy_seq_label(); + let _ = client + .provider + .sealed_monotonic_u64_advance_to(&label, policy.seq)?; + client.fleet_policy_seq_floor = + client.fleet_policy_seq_floor.max(policy.seq); + client.apply_fleet_policy(policy); + } } } } @@ -1626,26 +1707,32 @@ impl SecureMqttClient { fn next_session_seq(&self, peer_id: &str) -> Result { let label = self.session_out_seq_label(peer_id); - let current = unseal_u64(&self.provider, &label)?.unwrap_or(0); - let next = current.saturating_add(1).max(1); - seal_u64(&self.provider, &label, next)?; - Ok(next) + self.provider.sealed_monotonic_u64_increment(&label) } fn last_inbound_session_seq(&self, peer_id: &str) -> Result { let label = self.session_in_seq_label(peer_id); - Ok(unseal_u64(&self.provider, &label)?.unwrap_or(0)) + Ok(self.provider.sealed_monotonic_u64_get(&label)?.unwrap_or(0)) } fn persist_inbound_session_seq(&self, peer_id: &str, seq: u64) -> Result<()> { let label = self.session_in_seq_label(peer_id); - seal_u64(&self.provider, &label, seq) + let _ = self.provider.sealed_monotonic_u64_advance_to(&label, seq)?; + Ok(()) } fn fleet_policy_label(&self) -> String { format!("pqc-iiot:fleet-policy:v1:{}", self.storage_id) } + fn fleet_policy_seq_label(&self) -> String { + format!("pqc-iiot:fleet-policy-seq:v1:{}", self.storage_id) + } + + fn revocation_seq_label(&self) -> String { + format!("pqc-iiot:revocation-seq:v1:{}", self.storage_id) + } + fn load_sealed_fleet_policy(&self) -> Result> { let label = self.fleet_policy_label(); match self.provider.unseal_data(&label) { @@ -1667,6 +1754,35 @@ impl SecureMqttClient { self.provider.seal_data(&label, &blob) } + fn load_fleet_policy_seq_floor(&mut self) -> Result<()> { + let label = self.fleet_policy_seq_label(); + let sealed = self.provider.sealed_monotonic_u64_get(&label)?.unwrap_or(0); + self.fleet_policy_seq_floor = self.fleet_policy_seq_floor.max(sealed); + Ok(()) + } + + fn load_revocation_seq_floor(&mut self) -> Result<()> { + let label = self.revocation_seq_label(); + let sealed = self.provider.sealed_monotonic_u64_get(&label)?.unwrap_or(0); + self.revocation_seq_floor = self.revocation_seq_floor.max(sealed); + + let file_seq = self.keystore.revocation_seq(); + if file_seq > self.revocation_seq_floor { + // Crash window repair / upgrade path: keystore has advanced but the sealed floor hasn't. + let _ = self + .provider + .sealed_monotonic_u64_advance_to(&label, file_seq)?; + self.revocation_seq_floor = file_seq; + } else if file_seq < self.revocation_seq_floor { + warn!( + "revocation seq rollback detected: file_seq={} sealed_floor={} (storage_id={})", + file_seq, self.revocation_seq_floor, self.storage_id + ); + } + + Ok(()) + } + fn apply_fleet_policy(&mut self, policy: FleetPolicyUpdate) { self.strict_mode = policy.strict_mode; self.attestation_required = policy.attestation_required; @@ -1694,6 +1810,26 @@ impl SecureMqttClient { self.fleet_policy = Some(policy); } + fn ensure_fleet_policy_caught_up(&mut self, op: &str) -> Result<()> { + let floor = self.fleet_policy_seq_floor; + if floor == 0 { + return Ok(()); + } + let have = self.fleet_policy.as_ref().map(|p| p.seq).unwrap_or(0); + if have < floor { + warn!( + "fleet policy state behind sealed floor: have_seq={} < floor_seq={} (op={})", + have, floor, op + ); + let _ = self.maybe_request_fleet_policy_sync(); + return Err(Error::ClientError(format!( + "Fleet policy state behind sealed floor (have_seq={} < floor_seq={}); refusing {}", + have, floor, op + ))); + } + Ok(()) + } + fn ensure_storage_assurance(&self, op: &str) -> Result<()> { if self.require_rollback_resistant_storage && !self.provider.is_rollback_resistant_storage() { @@ -1707,10 +1843,11 @@ impl SecureMqttClient { } fn ensure_revocation_caught_up(&mut self, op: &str) -> Result<()> { - let min = match self.min_revocation_seq { - Some(v) => v, - None => return Ok(()), - }; + let min_policy = self.min_revocation_seq.unwrap_or(0); + let min = std::cmp::max(min_policy, self.revocation_seq_floor); + if min == 0 { + return Ok(()); + } let have = self.keystore.revocation_seq(); if have < min { warn!( @@ -1805,6 +1942,7 @@ impl SecureMqttClient { } fn ensure_fleet_policy_fresh(&mut self, op: &str) -> Result<()> { + self.ensure_fleet_policy_caught_up(op)?; self.ensure_storage_assurance(op)?; self.ensure_revocation_caught_up(op)?; if self.is_fleet_policy_stale()? { @@ -1818,6 +1956,18 @@ impl SecureMqttClient { } fn drop_if_fleet_policy_stale(&mut self, op: &str) -> Result { + let floor = self.fleet_policy_seq_floor; + if floor > 0 { + let have = self.fleet_policy.as_ref().map(|p| p.seq).unwrap_or(0); + if have < floor { + let _ = self.maybe_request_fleet_policy_sync(); + warn!( + "Dropping {}: fleet policy behind sealed floor (have_seq={} < floor_seq={})", + op, have, floor + ); + return Ok(true); + } + } if self.is_fleet_policy_stale()? { let _ = self.maybe_request_fleet_policy_sync(); warn!("Dropping {}: fleet policy stale (ttl exceeded)", op); @@ -1856,7 +2006,15 @@ impl SecureMqttClient { // Bind the persisted keystore generation to a sealed counter behind the provider. // In TPM/HSM-backed providers, this becomes an anti-rollback primitive for replay windows. let label = self.keystore_generation_label(); - seal_u64(&self.provider, &label, gen)?; + let _ = self.provider.sealed_monotonic_u64_advance_to(&label, gen)?; + + // Bind the file bytes to a sealed digest for tamper detection within the same generation. + let hash = self + .keystore_file_hash()? + .ok_or_else(|| Error::ClientError("Keystore missing after flush".into()))?; + let meta_label = self.keystore_meta_label(); + let meta_bytes = encode_keystore_meta_v1(gen, hash); + self.provider.seal_data(&meta_label, &meta_bytes)?; Ok(()) } @@ -1875,15 +2033,32 @@ impl SecureMqttClient { format!("pqc-iiot:keystore-gen:v1:{}", self.storage_id) } + fn keystore_meta_label(&self) -> String { + format!("pqc-iiot:keystore-meta:v1:{}", self.storage_id) + } + + fn keystore_file_hash(&self) -> Result> { + let path = self.keystore_path(); + if !path.exists() { + return Ok(None); + } + let blob = AtomicFileStore::read_with_limit(&path, MAX_KEYSTORE_FILE_BYTES)?; + let digest = Sha256::digest(&blob); + let mut out = [0u8; 32]; + out.copy_from_slice(&digest); + Ok(Some(out)) + } + fn init_keystore_anti_rollback(&mut self) -> Result<()> { let label = self.keystore_generation_label(); let file_gen = self.keystore.generation(); - let sealed_gen = unseal_u64(&self.provider, &label)?; + let sealed_gen = self.provider.sealed_monotonic_u64_get(&label)?; + let mut crash_repair = false; match sealed_gen { None => { // First run (or legacy upgrade): anchor the counter to the current on-disk generation. - seal_u64(&self.provider, &label, file_gen)?; + self.provider.seal_data(&label, &file_gen.to_be_bytes())?; } Some(sealed) => { if file_gen < sealed { @@ -1895,7 +2070,8 @@ impl SecureMqttClient { if file_gen > sealed { // Accept a +1 mismatch as a crash window repair; anything larger is suspicious. if file_gen == sealed.saturating_add(1) { - seal_u64(&self.provider, &label, file_gen)?; + self.provider.seal_data(&label, &file_gen.to_be_bytes())?; + crash_repair = true; } else { return Err(Error::ClientError(format!( "Keystore generation mismatch: file_gen={} sealed_gen={}", @@ -1905,6 +2081,52 @@ impl SecureMqttClient { } } } + + // Bind the keystore file contents to a sealed digest to detect tampering within the same generation. + let meta_label = self.keystore_meta_label(); + let meta = match self.provider.unseal_data(&meta_label) { + Ok(blob) => Some(blob), + Err(Error::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => None, + Err(e) => return Err(e), + }; + + let file_hash = self.keystore_file_hash()?; + match (meta, file_hash) { + (None, None) => { + // No keystore on disk yet; nothing to bind. + } + (Some(_), None) => { + // Sealed meta exists but file is missing: treat as tamper/destructive rollback. + return Err(Error::ClientError( + "Keystore missing but sealed meta present (possible tamper/rollback)".into(), + )); + } + (None, Some(hash)) => { + // Upgrade path: keystore exists but no sealed meta yet. Anchor it now. + let bytes = encode_keystore_meta_v1(file_gen, hash); + self.provider.seal_data(&meta_label, &bytes)?; + } + (Some(blob), Some(hash)) => { + let (meta_gen, meta_hash) = decode_keystore_meta_v1(&blob)?; + if meta_gen != file_gen { + if crash_repair && meta_gen == file_gen.saturating_sub(1) { + // Crash window repair: file advanced but meta didn't get sealed. Re-anchor. + let bytes = encode_keystore_meta_v1(file_gen, hash); + self.provider.seal_data(&meta_label, &bytes)?; + } else { + return Err(Error::ClientError(format!( + "Keystore meta generation mismatch: meta_gen={} file_gen={}", + meta_gen, file_gen + ))); + } + } else if meta_hash != hash { + return Err(Error::ClientError( + "Keystore tamper detected (sealed digest mismatch)".into(), + )); + } + } + } + Ok(()) } @@ -3882,7 +4104,7 @@ impl SecureMqttClient { return Ok(()); } - let current_seq = self.keystore.revocation_seq(); + let current_seq = std::cmp::max(self.keystore.revocation_seq(), self.revocation_seq_floor); if update.seq <= current_seq { debug!( "Ignoring revocation update: seq={} <= current_seq={}", @@ -3905,6 +4127,11 @@ impl SecureMqttClient { // Persist immediately: revocations are emergency policy updates and must survive restarts. self.flush_keystore()?; + let label = self.revocation_seq_label(); + let _ = self + .provider + .sealed_monotonic_u64_advance_to(&label, update.seq)?; + self.revocation_seq_floor = self.revocation_seq_floor.max(update.seq); self.persist_manager.notify_flushed(); Ok(()) @@ -3937,7 +4164,10 @@ impl SecureMqttClient { return Ok(()); } - let current_seq = self.fleet_policy.as_ref().map(|p| p.seq).unwrap_or(0); + let current_seq = std::cmp::max( + self.fleet_policy.as_ref().map(|p| p.seq).unwrap_or(0), + self.fleet_policy_seq_floor, + ); if update.seq <= current_seq { debug!( "Ignoring fleet policy update: seq={} <= current_seq={}", @@ -3948,6 +4178,13 @@ impl SecureMqttClient { // Persist before applying so a crash after apply doesn't revert to an older policy. self.seal_fleet_policy(&update)?; + // Advance the sealed monotonic floor (anti-rollback). Hardware providers should back this + // with a TPM NV counter / TEE monotonic store. + let label = self.fleet_policy_seq_label(); + let _ = self + .provider + .sealed_monotonic_u64_advance_to(&label, update.seq)?; + self.fleet_policy_seq_floor = self.fleet_policy_seq_floor.max(update.seq); self.apply_fleet_policy(update); Ok(()) diff --git a/src/security/provider.rs b/src/security/provider.rs index 09c54d4..2c2aba6 100644 --- a/src/security/provider.rs +++ b/src/security/provider.rs @@ -103,6 +103,57 @@ pub trait SecurityProvider: Send + Sync { /// Unseal data from persistent storage. fn unseal_data(&self, label: &str) -> Result>; + /// Read a sealed monotonic `u64` counter from the provider. + /// + /// This is an explicit primitive used to model monotonic state that must survive restarts: + /// - secure time floors + /// - replay windows / sequence counters + /// - policy/revocation sequence gates + /// + /// Security semantics: + /// - The counter is only rollback-resistant when `is_rollback_resistant_storage() == true`. + /// - The default implementation persists the counter via `seal_data/unseal_data`. + /// Hardware providers should override these methods to use TPM NV counters / TEE monotonic + /// storage when available. + fn sealed_monotonic_u64_get(&self, label: &str) -> Result> { + match self.unseal_data(label) { + Ok(blob) => { + if blob.len() != 8 { + return Err(Error::CryptoError(format!( + "Invalid sealed u64 length for {}: {}", + label, + blob.len() + ))); + } + let mut buf = [0u8; 8]; + buf.copy_from_slice(&blob); + Ok(Some(u64::from_be_bytes(buf))) + } + Err(Error::IoError(e)) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(e), + } + } + + /// Advance a sealed monotonic counter if `candidate > current`. + /// + /// Returns `Ok(true)` when the counter advanced, `Ok(false)` otherwise. + fn sealed_monotonic_u64_advance_to(&self, label: &str, candidate: u64) -> Result { + let current = self.sealed_monotonic_u64_get(label)?.unwrap_or(0); + if candidate > current { + self.seal_data(label, &candidate.to_be_bytes())?; + return Ok(true); + } + Ok(false) + } + + /// Increment a sealed monotonic counter by 1, persist it, and return the new value. + fn sealed_monotonic_u64_increment(&self, label: &str) -> Result { + let current = self.sealed_monotonic_u64_get(label)?.unwrap_or(0); + let next = current.saturating_add(1).max(1); + self.seal_data(label, &next.to_be_bytes())?; + Ok(next) + } + /// Generate an Attestation Quote. fn generate_quote( &self, diff --git a/src/security/time.rs b/src/security/time.rs index 559f7d7..27c11de 100644 --- a/src/security/time.rs +++ b/src/security/time.rs @@ -1,4 +1,3 @@ -use crate::security::monotonic::{seal_u64, unseal_u64}; use crate::security::provider::SecurityProvider; use crate::Result; use log::{info, warn}; @@ -47,12 +46,12 @@ impl SecureTimeFloor { /// and seals it immediately. pub fn load(provider: Arc, label: impl Into) -> Result { let label = label.into(); - let persisted = unseal_u64(&provider, &label)?; + let persisted = provider.sealed_monotonic_u64_get(&label)?; let floor = persisted.unwrap_or_else(system_unix_seconds); // If this is the first run (no persisted floor), anchor immediately. if persisted.is_none() { - seal_u64(&provider, &label, floor)?; + provider.seal_data(&label, &floor.to_be_bytes())?; info!("secure time floor initialized: {}={}", label, floor); } @@ -86,12 +85,14 @@ impl SecureTimeFloor { if now > self.floor_unix_s { self.floor_unix_s = now; if self.last_persist.elapsed() >= self.persist_interval { - seal_u64(&self.provider, &self.label, self.floor_unix_s)?; + let _ = self + .provider + .sealed_monotonic_u64_advance_to(&self.label, self.floor_unix_s)?; self.last_persist = Instant::now(); } } - Ok(now) + Ok(self.floor_unix_s) } /// Returns the assurance level of the monotonic floor based on the provider backend. @@ -105,7 +106,9 @@ impl SecureTimeFloor { /// Force persistence of the current floor. pub fn flush(&mut self) -> Result<()> { - seal_u64(&self.provider, &self.label, self.floor_unix_s)?; + let _ = self + .provider + .sealed_monotonic_u64_advance_to(&self.label, self.floor_unix_s)?; self.last_persist = Instant::now(); Ok(()) } diff --git a/tests/control_plane_sync.rs b/tests/control_plane_sync.rs new file mode 100644 index 0000000..faf4d77 --- /dev/null +++ b/tests/control_plane_sync.rs @@ -0,0 +1,84 @@ +use pqc_iiot::crypto::traits::PqcSignature; +use pqc_iiot::mqtt_control_plane::MqttControlPlane; +use pqc_iiot::mqtt_secure::SecureMqttClient; +use pqc_iiot::security::policy::FleetPolicyUpdate; +use pqc_iiot::Falcon; +use std::thread; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +mod common; + +#[test] +fn control_plane_serves_policy_sync_requests() -> Result<(), Box> { + let port = 29860; + common::start_mqtt_broker(port); + let suffix: u32 = rand::random(); + + let falcon = Falcon::new(); + let (ca_pk, ca_sk) = falcon.generate_keypair().expect("ca keygen"); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let mut policy = FleetPolicyUpdate { + version: FleetPolicyUpdate::VERSION_V2, + seq: 1, + issued_at: now, + require_rollback_resistant_storage: false, + strict_mode: false, + attestation_required: false, + require_sessions: true, + min_revocation_seq: None, + sig_verify_budget: None, + decrypt_budget: None, + ttl_secs: None, + session_rekey_after_msgs: None, + session_rekey_after_secs: None, + signature: Vec::new(), + }; + policy.sign(&ca_sk, "pqc/policy/v1")?; + + // Spawn a minimal control-plane responder that will serve the policy when clients request sync. + let mut cp = MqttControlPlane::new("localhost", port, &format!("cp_{}", suffix)); + cp.set_policy_update(policy); + thread::spawn(move || { + let _ = cp.run(); + }); + + // Client that pins the CA key so it can verify policy updates. + let client_id = format!("alice_cp_{}", suffix); + let mut alice = SecureMqttClient::new("localhost", port, &client_id)? + .with_trust_anchor_ca_sig_pk(ca_pk) + .with_strict_mode(false); + alice.bootstrap()?; + + let topic = format!("secure/cp_sync_test_{}", suffix); + let target_id = format!("bob_cp_{}", suffix); + + // Wait until the policy applies; once it does, publish_encrypted must fail closed without a session. + let start = Instant::now(); + while start.elapsed() < Duration::from_secs(3) { + alice.poll(|_, _| {})?; + let err = alice.publish_encrypted(&topic, b"X", &target_id); + if let Err(e) = err { + let msg = format!("{e:?}"); + if msg.contains("Fleet policy requires sessions") { + return Ok(()); + } + } + thread::sleep(Duration::from_millis(20)); + } + + let err = alice + .publish_encrypted(&topic, b"BLOCKED", &target_id) + .expect_err("expected policy to require sessions via sync responder"); + let msg = format!("{err:?}"); + assert!( + msg.contains("Fleet policy requires sessions"), + "unexpected error: {msg}" + ); + + Ok(()) +} diff --git a/tests/mqtt_invariants.rs b/tests/mqtt_invariants.rs index 92ad2ea..e23e21b 100644 --- a/tests/mqtt_invariants.rs +++ b/tests/mqtt_invariants.rs @@ -7,6 +7,12 @@ use std::time::{Duration, Instant}; mod common; +fn sealed_blob_path(label: &str) -> std::path::PathBuf { + // Mirror `src/security/provider.rs::sealed_blob_path` (kept private). + let digest = Sha256::digest(label.as_bytes()); + std::path::Path::new("pqc-data").join(format!("sealed_{}.bin", hex::encode(digest))) +} + fn mqtt_msg_digest(sender_id: &str, topic: &str, encrypted_blob: &[u8]) -> [u8; 32] { let mut hasher = Sha256::new(); hasher.update(b"pqc-iiot:mqtt-msg:v1"); @@ -569,6 +575,124 @@ fn mqtt_policy_v2_ttl_stale_blocks_new_handshakes() -> Result<(), Box Result<(), Box> { + let port = 29850; + common::start_mqtt_broker(port); + let suffix: u32 = rand::random(); + + let key_prefix = format!("pqc/policy_rb_keys_{}/", suffix); + let alice_id = format!("alice_policy_rb_{}", suffix); + let bob_id = format!("bob_policy_rb_{}", suffix); + + // Mesh CA used to sign fleet policy updates. + let falcon = Falcon::new(); + let (ca_pk, ca_sk) = falcon.generate_keypair().expect("ca keygen"); + let ca_pk_reboot = ca_pk.clone(); + + let mut alice = SecureMqttClient::new("localhost", port, &alice_id)? + .with_strict_mode(false) + .with_key_prefix(&key_prefix) + .with_trust_anchor_ca_sig_pk(ca_pk.clone()); + let mut bob = SecureMqttClient::new("localhost", port, &bob_id)? + .with_strict_mode(false) + .with_key_prefix(&key_prefix) + .with_trust_anchor_ca_sig_pk(ca_pk); + + alice.bootstrap()?; + bob.bootstrap()?; + + // Apply a policy update seq=2, capture its sealed blob bytes. + let mut policy_2 = pqc_iiot::security::policy::FleetPolicyUpdate { + version: pqc_iiot::security::policy::FleetPolicyUpdate::VERSION_V2, + seq: 2, + issued_at: 1, + require_rollback_resistant_storage: false, + strict_mode: false, + attestation_required: false, + require_sessions: true, + min_revocation_seq: None, + sig_verify_budget: None, + decrypt_budget: None, + ttl_secs: None, + session_rekey_after_msgs: None, + session_rekey_after_secs: None, + signature: vec![], + }; + policy_2.sign(&ca_sk, "pqc/policy/v1")?; + publish_raw( + "pqc/policy/v1", + port, + serde_json::to_vec(&policy_2).expect("policy_2 json"), + )?; + + let start = Instant::now(); + while start.elapsed() < Duration::from_secs(2) { + alice.poll(|_, _| {})?; + bob.poll(|_, _| {})?; + std::thread::sleep(Duration::from_millis(10)); + } + + let policy_label = format!("pqc-iiot:fleet-policy:v1:{}", alice_id); + let policy_path = sealed_blob_path(&policy_label); + let sealed_policy_seq2 = std::fs::read(&policy_path).expect("sealed policy seq2 missing"); + + // Apply a newer policy update seq=3 (advances the monotonic seq floor), then roll back only the policy blob. + let mut policy_3 = pqc_iiot::security::policy::FleetPolicyUpdate { + version: pqc_iiot::security::policy::FleetPolicyUpdate::VERSION_V2, + seq: 3, + issued_at: 2, + require_rollback_resistant_storage: false, + strict_mode: false, + attestation_required: false, + require_sessions: true, + min_revocation_seq: None, + sig_verify_budget: None, + decrypt_budget: None, + ttl_secs: None, + session_rekey_after_msgs: None, + session_rekey_after_secs: None, + signature: vec![], + }; + policy_3.sign(&ca_sk, "pqc/policy/v1")?; + publish_raw( + "pqc/policy/v1", + port, + serde_json::to_vec(&policy_3).expect("policy_3 json"), + )?; + + let start = Instant::now(); + while start.elapsed() < Duration::from_secs(2) { + alice.poll(|_, _| {})?; + bob.poll(|_, _| {})?; + std::thread::sleep(Duration::from_millis(10)); + } + + // Attack simulation: roll back only the fleet-policy blob (the monotonic floor is in a separate sealed label). + std::fs::write(&policy_path, &sealed_policy_seq2).expect("rollback policy blob write"); + + // Simulate a reboot: new client instance loads the sealed seq floor first and must refuse operations. + drop(alice); + let mut rebooted = SecureMqttClient::new("localhost", port, &alice_id)? + .with_strict_mode(false) + .with_key_prefix(&key_prefix) + .with_trust_anchor_ca_sig_pk(ca_pk_reboot); + + // Connect so `ensure_connected()` passes, but don't `poll()` yet (partition simulation / no catch-up applied). + rebooted.bootstrap()?; + + let err = rebooted + .initiate_session(&bob_id) + .expect_err("expected fail-closed under policy rollback"); + let msg = format!("{err:?}"); + assert!( + msg.contains("behind sealed floor"), + "unexpected error message: {msg}" + ); + + Ok(()) +} + #[test] fn mqtt_replay_window_accepts_out_of_order_within_window() -> Result<(), Box> {