diff --git a/README.md b/README.md index 4590d61..b6f86f7 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,10 @@ A Rust crate for post-quantum cryptography in Industrial IoT systems. Designed for **FIPS 140-3** and **IEC 62443** compliance in Critical Infrastructure: +### Security Contract + +The invariant-level security contract (threat model assumptions, trust boundaries, fail-closed gates) lives in `SECURITY_INVARIANTS.md`. Treat it as the reference for adversarial correctness and as the place where regressions must be codified as tests. + ### 🛡️ Mathematically Proven Security ("World Class") ![Uncrashable](https://img.shields.io/badge/Formal_Verification-Kani-success) ![Fuzzing](https://img.shields.io/badge/Fuzzing-LibFuzzer-coverage) diff --git a/SECURITY_INVARIANTS.md b/SECURITY_INVARIANTS.md new file mode 100644 index 0000000..5fcf451 --- /dev/null +++ b/SECURITY_INVARIANTS.md @@ -0,0 +1,224 @@ +# Security Invariants (Contract) + +This repository targets adversarial Industrial IoT (IIoT) deployments: untrusted networks, untrusted brokers, long partitions, and periodic endpoint compromise. + +This document is a **security contract**. Any change that weakens an invariant MUST either: + +- be explicitly justified (threat model change), and +- ship with new/updated regression tests proving the intended behavior. + +If you cannot test an invariant (e.g., hardware-backed anti-rollback), you must make the assumption explicit and **fail closed** for the operations that depend on it. + +--- + +## 0. Threat Model and Trust Boundaries + +### Actors + +- **Device**: endpoint running `pqc-iiot`. +- **Peer**: another device/gateway. +- **Broker** (MQTT) / **Network** (UDP/CoAP): fully adversarial. +- **Control plane**: CA/gateway service that publishes signed fleet policy and revocations. +- **Local storage**: may be writable and rollbackable under attacker with host access. + +### Adversary capabilities + +- Network MITM: replay, reorder, inject, drop, delay. +- Broker compromise: topic re-routing, retained message substitution, message amplification. +- Endpoint compromise (transient): attacker reads process memory and filesystem; may later lose access. +- Filesystem rollback: attacker restores previous sealed blobs and keystore files. + +### Non-goals (explicit) + +- A software-only provider is **not** a root-of-trust. It can provide best-effort persistence and encryption-at-rest, but it cannot prevent rollback by an attacker with filesystem write access. +- The `SoftwareTpm` model is a simulation for functional flows; it must not be treated as a TPM-grade attestation root. + +--- + +## 1. Provider / Persistence Invariants (Anti-Rollback) + +### 1.1 Rollback resistance is an explicit capability + +**Invariant:** Any security decision that relies on monotonic state across restarts MUST be gated by `SecurityProvider::is_rollback_resistant_storage() == true`. + +Rationale: Without a sealed monotonic counter (TPM NV / TEE counter / HSM monotonic storage / remote append-only service), an attacker can roll back state by restoring old blobs. + +Concrete gates in the MQTT stack: + +- Fleet policy sequence floors: `pqc-iiot:fleet-policy-seq:v1:` +- Revocation sequence floors: `pqc-iiot:revocation-seq:v1:` +- Secure time floor: `pqc-iiot:time-floor:v1:` + +**Fail-closed rule:** If fleet policy requires rollback-resistant storage, and the provider is not rollback-resistant, the client MUST fail closed for: + +- new session establishment / rekey +- encrypted sends (when policy requires sessions) +- acceptance of policy updates that enforce monotonic security gates + +Regression coverage: + +- `tests/mqtt_invariants.rs::mqtt_policy_v2_fails_closed_without_rollback_resistant_storage` +- `tests/mqtt_invariants.rs::mqtt_policy_rollback_detected_via_sealed_seq_floor` + +### 1.2 Sealed monotonic counters are monotonic by construction + +**Invariant:** A sealed monotonic counter MUST never decrease. + +Implementation note: + +- Software providers implement monotonic counters via sealed blobs (best-effort, rollbackable). +- Hardware providers must override monotonic counter operations to use rollback-resistant primitives. + +--- + +## 2. Fleet Policy / Revocation Invariants (Partitions) + +### 2.1 Policy update stream is signed, monotonic, and partition-aware + +**Invariant:** Fleet policy updates (`FleetPolicyUpdate`) are accepted only if: + +- signature verifies under the pinned CA key, and +- `seq` increases monotonically (local state), and +- rollback is detected via a sealed monotonic floor when rollback-resistant storage is available. + +**Invariant:** Revocation updates (`RevocationUpdate`) are accepted only if: + +- signature verifies under the pinned CA key, and +- `seq` increases monotonically, and +- rollback is detected via a sealed monotonic floor when rollback-resistant storage is available. + +### 2.2 Operational semantics under long partitions must be explicit + +This repository separates operation classes: + +- **High-risk**: session establishment, key rollover, accepting new trust material. +- **Medium-risk**: sending encrypted application data (telemetry/commands) depending on policy. +- **Low-risk**: local logging/metrics, receiving already-established session traffic (bounded). + +**Invariant:** When policy is stale (TTL exceeded under secure time), the client MUST fail closed for high-risk operations. + +Implementation note: + +- TTL enforcement is only meaningful if `SecureTimeFloor` is rollback-resistant; otherwise it becomes best-effort DoS signaling. + +**Invariant:** When a policy requires a minimum revocation sequence, the client MUST fail closed until caught up. + +Regression coverage: + +- `tests/control_plane_sync.rs::control_plane_serves_policy_sync_requests` +- `tests/mqtt_invariants.rs::mqtt_policy_v2_fails_closed_when_revocation_seq_behind` +- `tests/mqtt_invariants.rs::mqtt_policy_v2_ttl_stale_blocks_new_handshakes` + +--- + +## 3. MQTT Protocol Invariants + +### 3.1 Key announcements bind peer identity and topic context + +**Invariant:** Key announcements are signed over a canonical payload that includes `peer_id` and key material. A signed announcement MUST NOT be replayable under another peer id/topic. + +Regression coverage: + +- `tests/integration_tests.rs::test_key_announcement_binds_peer_id` +- `tests/integration_tests.rs::test_malicious_key_announcement_rejected` + +### 3.2 Domain separation for signatures + +**Invariant:** Every signed message type MUST include explicit domain separation and bind the relevant routing context (MQTT topic, sender id). + +Examples: + +- Key announcements: `pqc-iiot:key-announce:v2` +- Encrypted MQTT v1 messages: digest binds `sender_id` + `topic` + blob under `pqc-iiot:mqtt-msg:v1` +- Session control messages: domain-separated payloads bind initiator/responder ids and target topics. + +Regression coverage: + +- `tests/mqtt_invariants.rs::mqtt_signature_binds_topic` + +### 3.3 Anti-replay: cheap reject path, then bounded out-of-order acceptance + +**Invariant:** The receiver MUST reject replays deterministically. + +There are two replay domains: + +1) **v1 per-message hybrid encryption**: monotonically increasing `sequence_number` with a bounded replay window. +2) **session traffic**: per-chain message numbers + bounded skipped-key window; (DH-ratchet) chain transitions must not allow rollback. + +### 3.4 Asymmetric-cost DoS containment + +**Invariant:** The implementation MUST provide a cheap reject path before expensive crypto: + +- size limits before parsing (`serde_json::from_slice`) +- peer id validation before allocation +- token bucket budgets before signature verification / KEM / decrypt +- global peer budget caps to prevent cardinality explosions + +Regression expectations: + +- bounded memory growth (no unbounded HashMap growth from wire-controlled IDs) +- bounded CPU usage under sustained invalid traffic (rate limiting emits drops) + +--- + +## 4. MQTT Sessions Invariants (Forward Secrecy + PCS) + +### 4.1 Session handshake is authenticated and binds identities + +**Invariant:** Session establishment is authenticated by long-term Falcon identities and bound to: + +- initiator id, responder id +- handshake topics +- session id and per-peer monotonic `session_seq` + +This prevents broker-mediated session splicing and downgrade/replay of old session init messages. + +### 4.2 PCS requires a DH/KEM-driven ratchet, not only a symmetric chain + +**Invariant:** A symmetric-only chain (`CK -> HKDF -> next_CK`) is forward-secret but not PCS: + +- if the current chain key is compromised, the attacker can derive future keys until a rekey event. + +**Required property:** The session must periodically incorporate fresh asymmetric shared secrets into the root key (DH or KEM ratchet) to regain secrecy after compromise ends. + +In this codebase, the design target is: + +- **DH-ratchet** inside sessions (PCS against classical compromise). +- **KEM refresh** via session re-handshake policy (PQC refresh, and PCS reset when bidirectional traffic is absent). + +Regression coverage should include: + +- topic binding (ciphertext replayed on another topic must fail) +- replay rejection (duplicate packet must fail) +- bounded out-of-order acceptance within a skip window + +--- + +## 5. CoAP Security Invariants + +**Invariant:** Signed payloads are authenticity-only and MUST NOT be described as transport security. + +**Invariant:** Custom session encryption is not OSCORE/DTLS and MUST be marked experimental. + +For IIoT-critical deployments, the “industrial path” is: + +- OSCORE (RFC 8613), typically with EDHOC (RFC 9528), or +- DTLS, when OSCORE is not viable. + +See `docs/coap.md`. + +--- + +## 6. Attestation Invariants + +**Invariant:** Attestation MUST NOT be treated as a root-of-trust unless backed by a real TPM/TEE chain. + +The current model is a functional placeholder: + +- software provider signs quotes with the identity signing key and uses synthetic PCRs +- the “AK == sig_pk” binding is a simplification + +Any production-grade claim requires: + +- EK/AK separation, manufacturer chain, PCR policy, event log, and verifier policy definition. + diff --git a/docs/mqtt.md b/docs/mqtt.md index be40699..6d64db8 100644 --- a/docs/mqtt.md +++ b/docs/mqtt.md @@ -132,11 +132,63 @@ Fleet security policy is a CA-signed update stream (not broker-trusted configura Policy is treated as an explicit **security gate**: -- `require_sessions`: disallows v1 per-message hybrid encryption and requires v2 sessions (ratchet). +- `require_sessions`: disallows v1 per-message hybrid encryption and requires v3 forward-secure sessions (double ratchet). - `min_revocation_seq`: fail-closed until emergency revocations are caught up. - `ttl_secs`: when secure time is available, new handshakes and encrypted sends fail-closed once the policy becomes stale. - `require_rollback_resistant_storage`: fail-closed unless the provider backend is rollback resistant. +## Forward-Secure Sessions (v3: Double Ratchet) + +Per-message hybrid encryption (`publish_encrypted`) is simple but does **not** provide post-compromise security (PCS): if a peer identity key is compromised, historical traffic is still safe (PQC), but the attacker can forge traffic until revocation/rotation and the receiver has to verify signatures on every packet. + +For critical IIoT deployments, the crate supports forward-secure authenticated sessions: + +- Session establishment is authenticated by long-term Falcon identities (no broker trust). +- Initial shared secret is hybrid: Kyber (PQC) + X25519 (classical) handshake DH. +- Session traffic uses a DH-driven **double ratchet**: + - per-message symmetric ratchet (KDF chain) for forward secrecy + - periodic DH ratchet steps for PCS recovery after compromise ends (when bidirectional traffic exists) + +**Important:** the DH ratchet step is X25519 (classical). That gives PCS against a classical attacker who is no longer on the endpoint, but it is not “post-quantum PCS”. For PQC refresh, fleets should enforce periodic session re-handshakes (Kyber + X25519) via policy (`session_rekey_after_msgs` / `session_rekey_after_secs`) until a KEM-based in-session ratchet exists. + +### Handshake (topics + messages) + +Session control uses directed topics: + +- Initiator → Responder: `pqc/session/init/` (`SessionInitMessage`) +- Responder → Initiator: `pqc/session/resp/` (`SessionResponseMessage`) + +Both messages are JSON and include a detached Falcon signature over a canonical payload that binds: + +- MQTT topic +- initiator_id, responder_id +- session_id (16 bytes) +- session_seq (monotonic per-peer init sequence) +- initiator/responder ephemeral X25519 PKs +- initiator ephemeral Kyber PK + responder Kyber ciphertext +- timestamp (informational only) + +### Encrypted session packet (wire format) + +Session traffic is a binary packet carried as MQTT payload: + +``` +[sender_id_len:u16][sender_id][v=3][session_id:16][dh_pub:32][msg_num:u32][pn:u32][ct_len:u32][ct] +``` + +Where: + +- `dh_pub` is the sender’s current ratchet DH public key. +- `msg_num` is the message number in the current sending chain. +- `pn` is the previous chain length (Double Ratchet “PN”), used for skipped-key recovery across DH transitions. +- `ct` is AES-256-GCM ciphertext+tag, with AAD binding `(sender_id, receiver_id, topic, session_id, dh_pub, msg_num, pn)`. + +### Operational notes (availability vs security) + +- The responder derives its send chain only after processing the first inbound DH ratchet step; the initiator should send first. +- Long partitions are handled via retained policy/revocation updates and best-effort sync requests (`pqc/policy/sync/v1`, `pqc/revocations/sync/v1`). +- Session rekey thresholds are driven by fleet policy (`session_rekey_after_msgs`, `session_rekey_after_secs`) and trigger a fresh handshake. + ### Anti-Rollback Floors (Sealed Monotonic Counters) Critical fleets must assume filesystem compromise and rollback attempts. To model this explicitly, the client persists: diff --git a/docs/security.md b/docs/security.md index 5efdcff..4192ff0 100644 --- a/docs/security.md +++ b/docs/security.md @@ -2,6 +2,8 @@ This document provides detailed information about the security features and considerations of the PQC-IIoT crate. +For the project’s invariant-level security contract, see `SECURITY_INVARIANTS.md` at the repository root. That document is the reference for “what must always remain true” under adversarial conditions (replay, rollback, partitions, broker compromise). + ## Table of Contents - [Cryptographic Primitives](#cryptographic-primitives) @@ -35,15 +37,6 @@ This document provides detailed information about the security features and cons - Optimized for embedded systems - Constant-time implementation -#### BIKE (Experimental) -- Code-based KEM -- Security levels: - - Level 1 (experimental) - - Level 3 (experimental) - - Level 5 (experimental) -- For research purposes -- Not recommended for production use - ### Digital Signatures #### Falcon @@ -68,18 +61,16 @@ This document provides detailed information about the security features and cons ## Protocol Security ### MQTT Security -- Post-quantum key exchange -- Message authentication -- Replay protection -- Topic validation -- Access control +- Provisioned identity (strict-mode) via signed operational certificates + key announcements bound to peer id/topic. +- v1 per-message hybrid encryption (Kyber + X25519 → AES-256-GCM) with signature authentication and sliding-window replay protection. +- v3 forward-secure sessions (authenticated handshake + DH-driven double ratchet) with topic/context binding and bounded out-of-order acceptance. +- Partition-aware policy + revocation updates (CA-signed, monotonic, retained) with fail-closed gates for high-risk operations. +- Asymmetric-cost DoS containment: size limits + peer-id sanitation + per-peer/global token-bucket budgets before expensive crypto. ### CoAP Security -- Post-quantum key exchange -- Message authentication -- Resource protection -- Path validation -- Access control +- Signed payload mode: authenticity-only of application payloads when peer keys are pinned. +- Custom secure session mode: confidentiality + integrity + anti-replay at the application layer (not OSCORE/DTLS). +- For interoperability/compliance-critical deployments, OSCORE (with EDHOC) or DTLS is still the “industrial” transport/security boundary. ## Implementation Security diff --git a/src/mqtt_secure.rs b/src/mqtt_secure.rs index 3d734b6..c9e363c 100644 --- a/src/mqtt_secure.rs +++ b/src/mqtt_secure.rs @@ -272,7 +272,7 @@ pub struct SecureMqttClient { storage_id: String, sequence_number: u64, strict_mode: bool, - /// If true, disallow v1 per-message hybrid encryption and require session/ratchet (v2). + /// If true, disallow v1 per-message hybrid encryption and require forward-secure sessions (v3, double ratchet). require_sessions: bool, /// If true, require rollback-resistant sealing/storage in the active fleet policy. require_rollback_resistant_storage: bool, @@ -635,7 +635,7 @@ fn kyber_for_sk_len(len: usize) -> Result { } } -fn derive_session_chain_keys_v1(kem_ss: &[u8], dh_ss: &[u8]) -> Result<([u8; 32], [u8; 32])> { +fn derive_session_root_key_v3(kem_ss: &[u8], dh_ss: &[u8]) -> Result<[u8; 32]> { if kem_ss.len() != 32 || dh_ss.len() != 32 { return Err(Error::CryptoError(format!( "Invalid session shared secret lengths: kem_ss={} dh_ss={}", @@ -643,46 +643,110 @@ fn derive_session_chain_keys_v1(kem_ss: &[u8], dh_ss: &[u8]) -> Result<([u8; 32] dh_ss.len() ))); } + + // Session root key material: Kyber (PQC) + X25519 (classical) handshake secrets. + // The DH ratchet mixes additional DH outputs into this root over time. let mut ikm = [0u8; 64]; ikm[..32].copy_from_slice(kem_ss); ikm[32..].copy_from_slice(dh_ss); let hk = Hkdf::::new(None, &ikm); - let mut ck_initiator = [0u8; 32]; - let mut ck_responder = [0u8; 32]; - hk.expand(b"pqc-iiot:mqtt-session:v1:ck-initiator", &mut ck_initiator) - .map_err(|_| Error::CryptoError("HKDF expand failed (ck-initiator)".into()))?; - hk.expand(b"pqc-iiot:mqtt-session:v1:ck-responder", &mut ck_responder) - .map_err(|_| Error::CryptoError("HKDF expand failed (ck-responder)".into()))?; + let mut rk0 = [0u8; 32]; + hk.expand(b"pqc-iiot:mqtt-session:v3:rk0", &mut rk0) + .map_err(|_| Error::CryptoError("HKDF expand failed (rk0)".into()))?; ikm.zeroize(); - - Ok((ck_initiator, ck_responder)) + Ok(rk0) } const MQTT_SESSION_MAX_SKIPPED_KEYS: usize = 50; const MQTT_SESSION_MAX_MESSAGES: u32 = 100_000; +const MQTT_SESSION_SKIPPED_KEYS_TOTAL_LIMIT: usize = 4 * MQTT_SESSION_MAX_SKIPPED_KEYS; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +struct SkippedKeyId { + dh_pub: [u8; 32], + msg_num: u32, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct SessionPacketHeaderV3 { + dh_pub: [u8; 32], + msg_num: u32, + prev_chain_len: u32, +} -#[derive(Debug)] struct MqttSession { session_id: [u8; 16], created_at: Instant, - send_chain_key: [u8; 32], - recv_chain_key: [u8; 32], + // DH-driven double ratchet state (PCS). + root_key: [u8; 32], + dh_send_sk: X25519StaticSecret, + dh_send_pk: [u8; 32], + dh_recv_pk: [u8; 32], + send_chain_key: Option<[u8; 32]>, + recv_chain_key: Option<[u8; 32]>, + // Per-chain counters. send_msg_num: u32, recv_msg_num: u32, - skipped_message_keys: std::collections::HashMap, + // Previous chain length (PN in the Double Ratchet header). + prev_chain_len: u32, + // Monotonic send counter for policy rekey thresholds (not a cryptographic nonce/input). + sent_messages_total: u32, + // Out-of-order / late delivery support across DH chain transitions. + skipped_message_keys: std::collections::HashMap, } impl MqttSession { - fn new(session_id: [u8; 16], send_chain_key: [u8; 32], recv_chain_key: [u8; 32]) -> Self { + fn new_initiator( + session_id: [u8; 16], + rk0: [u8; 32], + peer_handshake_dh: [u8; 32], + ) -> Result { + // Initiator starts with a fresh DH sending key to force an early DH-ratchet step on the responder. + let dh_send_sk = X25519StaticSecret::random_from_rng(OsRng); + let dh_send_pk = X25519PublicKey::from(&dh_send_sk).to_bytes(); + let (root_key, send_chain_key) = Self::kdf_rk(&rk0, &dh_send_sk, &peer_handshake_dh)?; + + Ok(Self { + session_id, + created_at: Instant::now(), + root_key, + dh_send_sk, + dh_send_pk, + dh_recv_pk: peer_handshake_dh, + send_chain_key: Some(send_chain_key), + recv_chain_key: None, + send_msg_num: 0, + recv_msg_num: 0, + prev_chain_len: 0, + sent_messages_total: 0, + skipped_message_keys: std::collections::HashMap::new(), + }) + } + + fn new_responder( + session_id: [u8; 16], + rk0: [u8; 32], + responder_handshake_sk: X25519StaticSecret, + responder_handshake_pk: [u8; 32], + initiator_handshake_pk: [u8; 32], + ) -> Self { + // Responder cannot derive a receive chain until it sees the initiator's first ratchet DH key. + // It keeps its handshake DH key as DHs, so it can ratchet immediately on the first message. Self { session_id, created_at: Instant::now(), - send_chain_key, - recv_chain_key, + root_key: rk0, + dh_send_sk: responder_handshake_sk, + dh_send_pk: responder_handshake_pk, + dh_recv_pk: initiator_handshake_pk, + send_chain_key: None, + recv_chain_key: None, send_msg_num: 0, recv_msg_num: 0, + prev_chain_len: 0, + sent_messages_total: 0, skipped_message_keys: std::collections::HashMap::new(), } } @@ -692,22 +756,40 @@ impl MqttSession { .map_err(|_| Error::CryptoError("HKDF PRK init failed".into()))?; let mut mk = [0u8; 32]; let mut next_ck = [0u8; 32]; - hkdf.expand(b"pqc-iiot:mqtt-session:v1:mk", &mut mk) + hkdf.expand(b"pqc-iiot:mqtt-session:v3:mk", &mut mk) .map_err(|_| Error::CryptoError("HKDF expand failed (mk)".into()))?; - hkdf.expand(b"pqc-iiot:mqtt-session:v1:ck", &mut next_ck) + hkdf.expand(b"pqc-iiot:mqtt-session:v3:ck", &mut next_ck) .map_err(|_| Error::CryptoError("HKDF expand failed (ck)".into()))?; Ok((next_ck, mk)) } - fn aad_v2( + fn kdf_rk( + rk: &[u8; 32], + dh_send_sk: &X25519StaticSecret, + dh_recv_pk: &[u8; 32], + ) -> Result<([u8; 32], [u8; 32])> { + let peer_pub = X25519PublicKey::from(*dh_recv_pk); + let dh_out = dh_send_sk.diffie_hellman(&peer_pub).to_bytes(); + + let (_, hkdf) = Hkdf::::extract(Some(rk), dh_out.as_slice()); + let mut new_rk = [0u8; 32]; + let mut ck = [0u8; 32]; + hkdf.expand(b"pqc-iiot:mqtt-session:v3:rk", &mut new_rk) + .map_err(|_| Error::CryptoError("HKDF expand failed (rk)".into()))?; + hkdf.expand(b"pqc-iiot:mqtt-session:v3:ck", &mut ck) + .map_err(|_| Error::CryptoError("HKDF expand failed (ck)".into()))?; + Ok((new_rk, ck)) + } + + fn aad_v3( sender_id: &str, receiver_id: &str, topic: &str, session_id: &[u8; 16], - msg_num: u32, + header: &SessionPacketHeaderV3, ) -> Vec { let mut aad = Vec::new(); - aad.extend_from_slice(b"pqc-iiot:mqtt-msg:v2"); + aad.extend_from_slice(b"pqc-iiot:mqtt-msg:v3"); aad.extend_from_slice(&(sender_id.len() as u16).to_be_bytes()); aad.extend_from_slice(sender_id.as_bytes()); aad.extend_from_slice(&(receiver_id.len() as u16).to_be_bytes()); @@ -715,38 +797,115 @@ impl MqttSession { aad.extend_from_slice(&(topic.len() as u16).to_be_bytes()); aad.extend_from_slice(topic.as_bytes()); aad.extend_from_slice(session_id); - aad.extend_from_slice(&msg_num.to_be_bytes()); + aad.extend_from_slice(&header.dh_pub); + aad.extend_from_slice(&header.msg_num.to_be_bytes()); + aad.extend_from_slice(&header.prev_chain_len.to_be_bytes()); aad } - fn nonce_v2(session_id: &[u8; 16], msg_num: u32) -> [u8; 12] { + fn nonce_v3(session_id: &[u8; 16], msg_num: u32) -> [u8; 12] { let mut nonce = [0u8; 12]; nonce[..8].copy_from_slice(&session_id[..8]); nonce[8..].copy_from_slice(&msg_num.to_be_bytes()); nonce } - fn encrypt_v2( + fn clear_skipped_keys(&mut self) { + for (_, key) in self.skipped_message_keys.iter_mut() { + key.zeroize(); + } + self.skipped_message_keys.clear(); + } + + fn store_skipped_key(&mut self, dh_pub: [u8; 32], msg_num: u32, mk: [u8; 32]) { + if self.skipped_message_keys.len() >= MQTT_SESSION_SKIPPED_KEYS_TOTAL_LIMIT { + self.clear_skipped_keys(); + } + self.skipped_message_keys + .insert(SkippedKeyId { dh_pub, msg_num }, mk); + } + + fn skip_message_keys(&mut self, until: u32) -> Result<()> { + let Some(mut ck) = self.recv_chain_key else { + return Ok(()); + }; + + if until <= self.recv_msg_num { + return Ok(()); + } + + let delta = until - self.recv_msg_num; + if delta > MQTT_SESSION_MAX_SKIPPED_KEYS as u32 { + return Err(Error::CryptoError( + "Skipping too many message keys (limit exceeded)".into(), + )); + } + + while self.recv_msg_num < until { + let (next_ck, mk) = Self::kdf_ck(&ck)?; + self.store_skipped_key(self.dh_recv_pk, self.recv_msg_num, mk); + ck = next_ck; + self.recv_msg_num = self.recv_msg_num.saturating_add(1); + } + + self.recv_chain_key = Some(ck); + Ok(()) + } + + fn dh_ratchet(&mut self, new_peer_dh: [u8; 32]) -> Result<()> { + // Step 1: update receiving chain. + self.dh_recv_pk = new_peer_dh; + let (rk1, recv_ck) = Self::kdf_rk(&self.root_key, &self.dh_send_sk, &self.dh_recv_pk)?; + self.root_key = rk1; + self.recv_chain_key = Some(recv_ck); + self.recv_msg_num = 0; + + // Step 2: rotate DHs and derive sending chain. + self.prev_chain_len = self.send_msg_num; + self.send_msg_num = 0; + + self.dh_send_sk = X25519StaticSecret::random_from_rng(OsRng); + self.dh_send_pk = X25519PublicKey::from(&self.dh_send_sk).to_bytes(); + let (rk2, send_ck) = Self::kdf_rk(&self.root_key, &self.dh_send_sk, &self.dh_recv_pk)?; + self.root_key = rk2; + self.send_chain_key = Some(send_ck); + Ok(()) + } + + fn encrypt_v3( &mut self, sender_id: &str, receiver_id: &str, topic: &str, plaintext: &[u8], - ) -> Result<(u32, Vec)> { - if self.send_msg_num >= MQTT_SESSION_MAX_MESSAGES { + ) -> Result<(SessionPacketHeaderV3, Vec)> { + if self.sent_messages_total >= MQTT_SESSION_MAX_MESSAGES { return Err(Error::ProtocolError(format!( "Session {} exhausted message budget (send)", hex::encode(self.session_id) ))); } - let (next_ck, mk) = Self::kdf_ck(&self.send_chain_key)?; - self.send_chain_key = next_ck; + let Some(ck) = self.send_chain_key else { + return Err(Error::ProtocolError( + "Session send chain not ready (await first inbound ratchet)".into(), + )); + }; + + let (next_ck, mk) = Self::kdf_ck(&ck)?; + self.send_chain_key = Some(next_ck); let msg_num = self.send_msg_num; self.send_msg_num = self.send_msg_num.saturating_add(1); + self.sent_messages_total = self.sent_messages_total.saturating_add(1); - let aad = Self::aad_v2(sender_id, receiver_id, topic, &self.session_id, msg_num); - let nonce_bytes = Self::nonce_v2(&self.session_id, msg_num); + let header = SessionPacketHeaderV3 { + dh_pub: self.dh_send_pk, + msg_num, + prev_chain_len: self.prev_chain_len, + }; + + let aad = Self::aad_v3(sender_id, receiver_id, topic, &self.session_id, &header); + let nonce_bytes = Self::nonce_v3(&self.session_id, msg_num); let cipher = Aes256Gcm::new(Key::::from_slice(&mk)); let ciphertext = cipher @@ -759,20 +918,20 @@ impl MqttSession { ) .map_err(|_| Error::CryptoError("AES-GCM encryption failed".into()))?; - Ok((msg_num, ciphertext)) + Ok((header, ciphertext)) } - fn decrypt_with_mk_v2( + fn decrypt_with_mk_v3( &self, sender_id: &str, receiver_id: &str, topic: &str, - msg_num: u32, + header: &SessionPacketHeaderV3, mk: &[u8; 32], ciphertext: &[u8], ) -> Result> { - let aad = Self::aad_v2(sender_id, receiver_id, topic, &self.session_id, msg_num); - let nonce_bytes = Self::nonce_v2(&self.session_id, msg_num); + let aad = Self::aad_v3(sender_id, receiver_id, topic, &self.session_id, header); + let nonce_bytes = Self::nonce_v3(&self.session_id, header.msg_num); let cipher = Aes256Gcm::new(Key::::from_slice(mk)); cipher .decrypt( @@ -785,59 +944,78 @@ impl MqttSession { .map_err(|_| Error::CryptoError("AES-GCM decryption failed".into())) } - fn decrypt_v2( + fn decrypt_v3( &mut self, sender_id: &str, receiver_id: &str, topic: &str, - msg_num: u32, + header: SessionPacketHeaderV3, ciphertext: &[u8], ) -> Result> { - if let Some(mk) = self.skipped_message_keys.remove(&msg_num) { - return self.decrypt_with_mk_v2( + if let Some(mk) = self.skipped_message_keys.remove(&SkippedKeyId { + dh_pub: header.dh_pub, + msg_num: header.msg_num, + }) { + return self.decrypt_with_mk_v3( sender_id, receiver_id, topic, - msg_num, + &header, &mk, ciphertext, ); } - if msg_num < self.recv_msg_num { + // DH ratchet step: peer rotated its DH sending key. + if header.dh_pub != self.dh_recv_pk { + self.skip_message_keys(header.prev_chain_len)?; + self.dh_ratchet(header.dh_pub)?; + } + + let Some(mut ck) = self.recv_chain_key else { + return Err(Error::CryptoError( + "Session receive chain not ready (await DH ratchet)".into(), + )); + }; + + if header.msg_num < self.recv_msg_num { return Err(Error::CryptoError("Message too old / replay".into())); } - let delta = msg_num - self.recv_msg_num; + let delta = header.msg_num - self.recv_msg_num; if delta > MQTT_SESSION_MAX_SKIPPED_KEYS as u32 { return Err(Error::CryptoError( "Message too far in the future (skip limit exceeded)".into(), )); } - while self.recv_msg_num < msg_num { - let (next_ck, mk) = Self::kdf_ck(&self.recv_chain_key)?; - self.skipped_message_keys.insert(self.recv_msg_num, mk); - self.recv_chain_key = next_ck; + while self.recv_msg_num < header.msg_num { + let (next_ck, mk) = Self::kdf_ck(&ck)?; + self.store_skipped_key(self.dh_recv_pk, self.recv_msg_num, mk); + ck = next_ck; self.recv_msg_num = self.recv_msg_num.saturating_add(1); } - let (next_ck, mk) = Self::kdf_ck(&self.recv_chain_key)?; - self.recv_chain_key = next_ck; + let (next_ck, mk) = Self::kdf_ck(&ck)?; + ck = next_ck; self.recv_msg_num = self.recv_msg_num.saturating_add(1); + self.recv_chain_key = Some(ck); - self.decrypt_with_mk_v2(sender_id, receiver_id, topic, msg_num, &mk, ciphertext) + self.decrypt_with_mk_v3(sender_id, receiver_id, topic, &header, &mk, ciphertext) } } impl Drop for MqttSession { fn drop(&mut self) { - self.send_chain_key.zeroize(); - self.recv_chain_key.zeroize(); - for (_, key) in self.skipped_message_keys.iter_mut() { - key.zeroize(); + self.root_key.zeroize(); + // x25519_dalek secrets are zeroized on drop. + if let Some(ck) = self.send_chain_key.as_mut() { + ck.zeroize(); } - self.skipped_message_keys.clear(); + if let Some(ck) = self.recv_chain_key.as_mut() { + ck.zeroize(); + } + self.clear_skipped_keys(); } } @@ -2868,7 +3046,7 @@ impl SecureMqttClient { Ok(()) } - /// Publish an encrypted message using the forward-secure session ratchet (v2). + /// Publish an encrypted message using the forward-secure session ratchet (v3, double ratchet). /// /// Requires a session to be established via `initiate_session()` and a corresponding response. pub fn publish_encrypted_session( @@ -2887,7 +3065,7 @@ impl SecureMqttClient { let secs = self.session_rekey_after_secs; let mut required = false; if let Some(max_msgs) = msgs { - if peer_sessions.current.send_msg_num >= max_msgs { + if peer_sessions.current.sent_messages_total >= max_msgs { required = true; } } @@ -2928,10 +3106,10 @@ impl SecureMqttClient { })?; let session = peer_sessions.current_mut(); - let (msg_num, ciphertext) = - session.encrypt_v2(&self.client_id, target_peer_id, topic, payload)?; + let (header, ciphertext) = + session.encrypt_v3(&self.client_id, target_peer_id, topic, payload)?; - // Packet: [sender_id_len:u16][sender_id][v=2][session_id:16][msg_num:u32][ct_len:u32][ct] + // Packet v3 (double ratchet): [sender_id_len:u16][sender_id][v=3][session_id:16][dh_pub:32][msg_num:u32][pn:u32][ct_len:u32][ct] let sender_id_bytes = self.client_id.as_bytes(); let sender_id_len = sender_id_bytes.len() as u16; @@ -2940,13 +3118,16 @@ impl SecureMqttClient { } let ct_len = ciphertext.len() as u32; - let mut packet = - Vec::with_capacity(2 + sender_id_bytes.len() + 1 + 16 + 4 + 4 + ciphertext.len()); + let mut packet = Vec::with_capacity( + 2 + sender_id_bytes.len() + 1 + 16 + 32 + 4 + 4 + 4 + ciphertext.len(), + ); packet.extend_from_slice(&sender_id_len.to_be_bytes()); packet.extend_from_slice(sender_id_bytes); - packet.push(2); + packet.push(3); packet.extend_from_slice(&session.session_id); - packet.extend_from_slice(&msg_num.to_be_bytes()); + packet.extend_from_slice(&header.dh_pub); + packet.extend_from_slice(&header.msg_num.to_be_bytes()); + packet.extend_from_slice(&header.prev_chain_len.to_be_bytes()); packet.extend_from_slice(&ct_len.to_be_bytes()); packet.extend_from_slice(&ciphertext); @@ -3231,11 +3412,12 @@ impl SecureMqttClient { return Ok(None); } trace!("mqtt rx extracted sender_id={}", sender_id); - // Session/ratchet encrypted packet (v2): [2][session_id:16][msg_num:u32][ct_len:u32][ct] + // Session/ratchet encrypted packet (v3, double ratchet): + // [3][session_id:16][dh_pub:32][msg_num:u32][pn:u32][ct_len:u32][ct] // No per-message signature; authenticity is provided by the established session keys. - if !rest.is_empty() && rest[0] == 2 { + if !rest.is_empty() && rest[0] == 3 { if let Some(plaintext) = - self.try_decrypt_session_packet_v2(&topic, sender_id, rest)? + self.try_decrypt_session_packet_v3(&topic, sender_id, rest)? { return Ok(Some((topic, plaintext))); } @@ -3598,19 +3780,24 @@ impl SecureMqttClient { } }; - let (ck_initiator, ck_responder) = match derive_session_chain_keys_v1(&kem_ss, &dh_ss) { + let rk0 = match derive_session_root_key_v3(&kem_ss, &dh_ss) { Ok(v) => v, Err(e) => { warn!( - "Ignoring session init from {}: session key derivation failed: {}", + "Ignoring session init from {}: session root derivation failed: {}", msg.initiator_id, e ); return Ok(()); } }; - // Responder uses ck_responder for sending, ck_initiator for receiving. - let session = MqttSession::new(session_id, ck_responder, ck_initiator); + let session = MqttSession::new_responder( + session_id, + rk0, + responder_x_sk, + responder_x_pk, + initiator_x_pk, + ); if let Some(existing) = self.sessions.get_mut(&msg.initiator_id) { existing.rotate_to(session); @@ -3859,11 +4046,11 @@ impl SecureMqttClient { } }; - let (ck_initiator, ck_responder) = match derive_session_chain_keys_v1(&kem_ss, &dh_ss) { + let rk0 = match derive_session_root_key_v3(&kem_ss, &dh_ss) { Ok(v) => v, Err(e) => { warn!( - "Ignoring session response from {}: session key derivation failed: {}", + "Ignoring session response from {}: session root derivation failed: {}", msg.responder_id, e ); self.pending_sessions.insert(session_id, pending); @@ -3871,8 +4058,18 @@ impl SecureMqttClient { } }; - // Initiator uses ck_initiator for sending, ck_responder for receiving. - let session = MqttSession::new(session_id, ck_initiator, ck_responder); + // Initiator starts with a fresh DH sending key and immediately derives CKs. + let session = match MqttSession::new_initiator(session_id, rk0, responder_x_pk) { + Ok(v) => v, + Err(e) => { + warn!( + "Ignoring session response from {}: ratchet init failed: {}", + msg.responder_id, e + ); + self.pending_sessions.insert(session_id, pending); + return Ok(()); + } + }; if let Some(existing) = self.sessions.get_mut(&msg.responder_id) { existing.rotate_to(session); } else { @@ -3884,14 +4081,14 @@ impl SecureMqttClient { Ok(()) } - fn try_decrypt_session_packet_v2( + fn try_decrypt_session_packet_v3( &mut self, topic: &str, sender_id: &str, rest: &[u8], ) -> Result>> { - // [2][session_id:16][msg_num:u32][ct_len:u32][ct] - const HEADER_LEN: usize = 1 + 16 + 4 + 4; + // [3][session_id:16][dh_pub:32][msg_num:u32][pn:u32][ct_len:u32][ct] + const HEADER_LEN: usize = 1 + 16 + 32 + 4 + 4 + 4; if rest.len() < HEADER_LEN { warn!( "Dropping session packet from {}: too short ({} bytes)", @@ -3903,8 +4100,11 @@ impl SecureMqttClient { let mut session_id = [0u8; 16]; session_id.copy_from_slice(&rest[1..17]); - let msg_num = u32::from_be_bytes([rest[17], rest[18], rest[19], rest[20]]); - let ct_len = u32::from_be_bytes([rest[21], rest[22], rest[23], rest[24]]) as usize; + let mut dh_pub = [0u8; 32]; + dh_pub.copy_from_slice(&rest[17..49]); + let msg_num = u32::from_be_bytes([rest[49], rest[50], rest[51], rest[52]]); + let pn = u32::from_be_bytes([rest[53], rest[54], rest[55], rest[56]]); + let ct_len = u32::from_be_bytes([rest[57], rest[58], rest[59], rest[60]]) as usize; if rest.len() != HEADER_LEN + ct_len { warn!( @@ -3940,7 +4140,13 @@ impl SecureMqttClient { } }; - match session.decrypt_v2(sender_id, &self.client_id, topic, msg_num, ciphertext) { + let header = SessionPacketHeaderV3 { + dh_pub, + msg_num, + prev_chain_len: pn, + }; + + match session.decrypt_v3(sender_id, &self.client_id, topic, header, ciphertext) { Ok(pt) => Ok(Some(pt)), Err(e) => { warn!("Session decrypt failed for {}: {}", sender_id, e); diff --git a/src/ratchet.rs b/src/ratchet.rs index 986a3ff..037e888 100644 --- a/src/ratchet.rs +++ b/src/ratchet.rs @@ -16,6 +16,10 @@ use std::collections::HashMap; // Requires std for now. For no-std, use hashbrow /// Max number of skipped message keys to store (DoS protection) const MAX_SKIPPED_KEYS: usize = 50; +/// Hard cap on messages per ratchet session (overflow + availability boundary). +/// +/// This prevents `u32` counter rollover from silently breaking replay protection and key evolution. +const MAX_RATCHET_MESSAGES: u32 = 100_000; /// KEM-based Double Ratchet Session /// @@ -202,14 +206,10 @@ impl RatchetSession { /// Encrypts a message payload using the current Message Key (MK). /// Advances the sending chain. pub fn encrypt(&mut self, plaintext: &[u8]) -> Result { - // 1. Zero-Knowledge Key Rotation (Self-Healing) - // Automatic rotation after 1000 messages to limit breach impact. - if self.msg_num_send >= 1000 { - let (new_rk, new_ck) = Self::kdf_rk(&self.root_key, &[0xDE, 0xAD, 0xBE, 0xEF])?; - self.root_key = new_rk; - self.chain_key_send = new_ck; - self.msg_num_send = 0; - // Note: In production, we'd signal this rotation to the peer (e.g. in the header) + if self.msg_num_send >= MAX_RATCHET_MESSAGES { + return Err(Error::ProtocolError( + "Ratchet session message limit reached; rekey required".into(), + )); } // 2. Symmetric Ratchet Step diff --git a/src/security/policy.rs b/src/security/policy.rs index 87ff243..ddd7eb5 100644 --- a/src/security/policy.rs +++ b/src/security/policy.rs @@ -42,7 +42,7 @@ pub struct FleetPolicyUpdate { pub strict_mode: bool, /// If true, peers are marked trusted only after a verifier-driven attestation roundtrip. pub attestation_required: bool, - /// If true, disallow v1 per-message KEM/signature encryption and require session/ratchet (v2) before sending. + /// If true, disallow v1 per-message KEM/signature encryption and require forward-secure sessions before sending. pub require_sessions: bool, /// Optional minimum revocation sequence that must be applied before accepting trust transitions. /// diff --git a/tests/mqtt_invariants.rs b/tests/mqtt_invariants.rs index e23e21b..13ddda1 100644 --- a/tests/mqtt_invariants.rs +++ b/tests/mqtt_invariants.rs @@ -203,6 +203,30 @@ fn mqtt_session_ratchet_establishes_and_binds_topic_and_rejects_replay( "Expected no acceptance on topic_bad (topic binding)" ); + // Now ensure the responder side can send under the ratchet after receiving the first message. + // This is a regression guard for the DH-driven session initialization (PCS): Bob must derive a + // sending chain upon the first inbound DH ratchet step. + let topic_reply = format!("secure/session_reply_{}", suffix); + alice.subscribe(&topic_reply)?; + + bob.publish_encrypted(&topic_reply, b"SESSION_REPLY", &alice_id)?; + + let start = Instant::now(); + let mut got_reply = 0u32; + while start.elapsed() < Duration::from_secs(3) { + alice.poll(|t, p| { + if t == topic_reply && p == b"SESSION_REPLY" { + got_reply += 1; + } + })?; + if got_reply >= 1 { + break; + } + std::thread::sleep(Duration::from_millis(20)); + } + + assert_eq!(got_reply, 1, "Expected exactly one session reply delivery"); + Ok(()) }