diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 13b7b120..c1856b0d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,6 +34,13 @@ jobs: - name: Security scan (tests) run: bandit -r tests -ll + - name: Attestation fuzz regression gate + env: + RC_ADMIN_KEY: "0123456789abcdef0123456789abcdef" + DB_PATH: ":memory:" + ATTEST_FUZZ_CASES: "10000" + run: python -m pytest tests/test_attestation_fuzz.py -k mutation_regression_no_unhandled_exceptions -v + - name: Run tests with pytest (blocking) env: RC_ADMIN_KEY: "0123456789abcdef0123456789abcdef" diff --git a/.gitignore b/.gitignore index 8d1f90c7..53012fc9 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,9 @@ Thumbs.db # Logs *.log +.pytest_cache/ +pytest-cache-files-*/ +tests/.tmp_attestation/ # Windows miner build artifacts Rustchain/miners/windows/dist/ diff --git a/README.md b/README.md index e4574d4d..f9cf2863 100644 --- a/README.md +++ b/README.md @@ -221,6 +221,10 @@ Earn **RTC** by contributing to the RustChain ecosystem! --- +## Testing Notes + +- Attestation malformed-input fuzz harness and replayable corpus: [docs/attestation_fuzzing.md](docs/attestation_fuzzing.md) + ## 💰 Antiquity Multipliers Your hardware's age determines your mining rewards: diff --git a/docs/attestation_fuzzing.md b/docs/attestation_fuzzing.md new file mode 100644 index 00000000..c4904470 --- /dev/null +++ b/docs/attestation_fuzzing.md @@ -0,0 +1,47 @@ +# Attestation Malformed-Input Regression Harness + +This repository includes a deterministic malformed-input regression gate for `POST /attest/submit` plus a replayable regression corpus under `tests/attestation_corpus/`. + +## Corpus Classes + +Current explicit corpus entries cover these malformed input classes: + +1. Invalid JSON root: `null` +2. Invalid JSON root: array +3. Miner identifier shape mismatch +4. Device payload scalar/object mismatch +5. Signals payload scalar/object mismatch +6. Signals MAC list shape mismatch +7. Fingerprint checks array/object mismatch +8. Report payload scalar/object mismatch + +## Replay One Corpus Entry + +```bash +python tests/replay_attestation_corpus.py tests/attestation_corpus/malformed_report_scalar.json +``` + +The script prints the HTTP status code and parsed JSON response, and exits non-zero if replay causes a server-side `5xx`. + +## Quick Regression Gate + +```bash +python -m pytest tests/test_attestation_fuzz.py -v +``` + +## 10,000-Case Mutation Run + +PowerShell: + +```powershell +$env:ATTEST_FUZZ_CASES = "10000" +python -m pytest tests/test_attestation_fuzz.py -k mutation_regression_no_unhandled_exceptions -v +``` + +Bash: + +```bash +ATTEST_FUZZ_CASES=10000 python -m pytest tests/test_attestation_fuzz.py -k mutation_regression_no_unhandled_exceptions -v +``` + +This is the CI-mode gate for "no unhandled exceptions" in the attestation parsing path. Set `ATTEST_FUZZ_SEED` only when you need to reproduce a specific random sequence locally. diff --git a/node/rustchain_v2_integrated_v2.2.1_rip200.py b/node/rustchain_v2_integrated_v2.2.1_rip200.py index f6afcbde..4a24e8a6 100644 --- a/node/rustchain_v2_integrated_v2.2.1_rip200.py +++ b/node/rustchain_v2_integrated_v2.2.1_rip200.py @@ -3,7 +3,7 @@ RustChain v2 - Integrated Server Includes RIP-0005 (Epoch Rewards), RIP-0008 (Withdrawals), RIP-0009 (Finality) """ -import os, time, json, secrets, hashlib, hmac, sqlite3, base64, struct, uuid, glob, logging, sys, binascii, math +import os, time, json, secrets, hashlib, hmac, sqlite3, base64, struct, uuid, glob, logging, sys, binascii, math, re import ipaddress from urllib.parse import urlparse from flask import Flask, request, jsonify, g, send_from_directory, send_file, abort @@ -104,6 +104,185 @@ def generate_latest(): return b"# Prometheus not available" HOF_DIR = os.path.join(REPO_ROOT, "web", "hall-of-fame") DASHBOARD_DIR = os.path.join(REPO_ROOT, "tools", "miner_dashboard") + +def _attest_mapping(value): + """Return a dict-like payload section or an empty mapping.""" + return value if isinstance(value, dict) else {} + + +_ATTEST_MINER_RE = re.compile(r"^[A-Za-z0-9._:-]{1,128}$") + + +def _attest_text(value): + """Accept only non-empty text values from untrusted attestation input.""" + if isinstance(value, str): + value = value.strip() + if value: + return value + return None + + +def _attest_valid_miner(value): + """Accept only bounded miner identifiers with a conservative character set.""" + text = _attest_text(value) + if text and _ATTEST_MINER_RE.fullmatch(text): + return text + return None + + +def _attest_field_error(code, message, status=400): + """Build a consistent error payload for malformed attestation inputs.""" + return jsonify({ + "ok": False, + "error": code.lower(), + "message": message, + "code": code, + }), status + + +def _attest_is_valid_positive_int(value, max_value=4096): + """Validate positive integer-like input without silently coercing hostile shapes.""" + if isinstance(value, bool): + return False + if isinstance(value, float): + if not math.isfinite(value) or not value.is_integer(): + return False + try: + coerced = int(value) + except (TypeError, ValueError, OverflowError): + return False + return 1 <= coerced <= max_value + + +def client_ip_from_request(req) -> str: + """Return the left-most forwarded IP when present, otherwise the remote address.""" + client_ip = req.headers.get("X-Forwarded-For", req.remote_addr) + if client_ip and "," in client_ip: + client_ip = client_ip.split(",")[0].strip() + return client_ip + + +def _attest_positive_int(value, default=1): + """Coerce untrusted integer-like values to a safe positive integer.""" + try: + coerced = int(value) + except (TypeError, ValueError): + return default + return coerced if coerced > 0 else default + + +def _attest_string_list(value): + """Coerce a list-like field into a list of non-empty strings.""" + if not isinstance(value, list): + return [] + items = [] + for item in value: + text = _attest_text(item) + if text: + items.append(text) + return items + + +def _validate_attestation_payload_shape(data): + """Reject malformed attestation payload shapes before normalization.""" + for field_name, code in ( + ("device", "INVALID_DEVICE"), + ("signals", "INVALID_SIGNALS"), + ("report", "INVALID_REPORT"), + ("fingerprint", "INVALID_FINGERPRINT"), + ): + if field_name in data and data[field_name] is not None and not isinstance(data[field_name], dict): + return _attest_field_error(code, f"Field '{field_name}' must be a JSON object") + + for field_name in ("miner", "miner_id"): + if field_name in data and data[field_name] is not None and not isinstance(data[field_name], str): + return _attest_field_error("INVALID_MINER", f"Field '{field_name}' must be a non-empty string") + + miner = _attest_valid_miner(data.get("miner")) or _attest_valid_miner(data.get("miner_id")) + if not miner and not (_attest_text(data.get("miner")) or _attest_text(data.get("miner_id"))): + return _attest_field_error( + "MISSING_MINER", + "Field 'miner' or 'miner_id' must be a non-empty identifier using only letters, numbers, '.', '_', ':' or '-'", + ) + if not miner: + return _attest_field_error( + "INVALID_MINER", + "Field 'miner' or 'miner_id' must use only letters, numbers, '.', '_', ':' or '-' and be at most 128 characters", + ) + + device = data.get("device") + if isinstance(device, dict): + if "cores" in device and not _attest_is_valid_positive_int(device.get("cores")): + return _attest_field_error("INVALID_DEVICE_CORES", "Field 'device.cores' must be a positive integer between 1 and 4096", status=422) + for field_name in ("device_family", "family", "device_arch", "arch", "device_model", "model", "cpu", "serial_number", "serial"): + if field_name in device and device[field_name] is not None and not isinstance(device[field_name], str): + return _attest_field_error("INVALID_DEVICE", f"Field 'device.{field_name}' must be a string") + + signals = data.get("signals") + if isinstance(signals, dict): + if "macs" in signals: + macs = signals.get("macs") + if not isinstance(macs, list) or any(_attest_text(mac) is None for mac in macs): + return _attest_field_error("INVALID_SIGNALS_MACS", "Field 'signals.macs' must be a list of non-empty strings") + for field_name in ("hostname", "serial"): + if field_name in signals and signals[field_name] is not None and not isinstance(signals[field_name], str): + return _attest_field_error("INVALID_SIGNALS", f"Field 'signals.{field_name}' must be a string") + + report = data.get("report") + if isinstance(report, dict): + for field_name in ("nonce", "commitment"): + if field_name in report and report[field_name] is not None and not isinstance(report[field_name], str): + return _attest_field_error("INVALID_REPORT", f"Field 'report.{field_name}' must be a string") + + fingerprint = data.get("fingerprint") + if isinstance(fingerprint, dict) and "checks" in fingerprint and not isinstance(fingerprint.get("checks"), dict): + return _attest_field_error("INVALID_FINGERPRINT_CHECKS", "Field 'fingerprint.checks' must be a JSON object") + + return None + + +def _normalize_attestation_device(device): + """Shallow-normalize device metadata so malformed JSON shapes fail closed.""" + raw = _attest_mapping(device) + normalized = {"cores": _attest_positive_int(raw.get("cores"), default=1)} + for field in ( + "device_family", + "family", + "device_arch", + "arch", + "device_model", + "model", + "cpu", + "serial_number", + "serial", + ): + text = _attest_text(raw.get(field)) + if text is not None: + normalized[field] = text + return normalized + + +def _normalize_attestation_signals(signals): + """Shallow-normalize signal metadata used by attestation validation.""" + raw = _attest_mapping(signals) + normalized = {"macs": _attest_string_list(raw.get("macs"))} + for field in ("hostname", "serial"): + text = _attest_text(raw.get(field)) + if text is not None: + normalized[field] = text + return normalized + + +def _normalize_attestation_report(report): + """Normalize report metadata used by challenge/ticket handling.""" + raw = _attest_mapping(report) + normalized = {} + for field in ("nonce", "commitment"): + text = _attest_text(raw.get(field)) + if text is not None: + normalized[field] = text + return normalized + # Register Hall of Rust blueprint (tables initialized after DB_PATH is set) try: from hall_of_rust import hall_bp @@ -981,9 +1160,13 @@ def validate_fingerprint_data(fingerprint: dict, claimed_device: dict = None) -> if not fingerprint: # FIX #305: Missing fingerprint data is a validation failure return False, "no_fingerprint_data" + if not isinstance(fingerprint, dict): + return False, "fingerprint_not_dict" checks = fingerprint.get("checks", {}) - claimed_device = claimed_device or {} + if not isinstance(checks, dict): + checks = {} + claimed_device = claimed_device if isinstance(claimed_device, dict) else {} # FIX #305: Reject empty fingerprint payloads (e.g. fingerprint={} or checks={}) if not checks: @@ -1793,23 +1976,25 @@ def _check_hardware_binding(miner_id: str, device: dict, signals: dict = None, s def submit_attestation(): """Submit hardware attestation with fingerprint validation""" data = request.get_json(silent=True) - - # Type guard: reject non-dict JSON payloads (null, array, scalar) if not isinstance(data, dict): - return jsonify({"ok": False, "error": "Request body must be a JSON object", "code": "INVALID_JSON_OBJECT"}), 400 + return jsonify({ + "ok": False, + "error": "invalid_json_object", + "message": "Expected a JSON object request body", + "code": "INVALID_JSON_OBJECT" + }), 400 + payload_error = _validate_attestation_payload_shape(data) + if payload_error is not None: + return payload_error # Extract client IP (handle nginx proxy) - client_ip = request.headers.get("X-Forwarded-For", request.remote_addr) - if client_ip and "," in client_ip: - client_ip = client_ip.split(",")[0].strip() # First IP in chain + client_ip = client_ip_from_request(request) - # Extract attestation data (type guards for fuzz safety) - miner = data.get('miner') or data.get('miner_id') - if miner is not None and not isinstance(miner, str): - miner = str(miner) - report = data.get('report', {}) if isinstance(data.get('report'), dict) else {} - nonce = report.get('nonce') or data.get('nonce') - device = data.get('device', {}) if isinstance(data.get('device'), dict) else {} + # Extract attestation data + miner = _attest_valid_miner(data.get('miner')) or _attest_valid_miner(data.get('miner_id')) + report = _normalize_attestation_report(data.get('report')) + nonce = report.get('nonce') or _attest_text(data.get('nonce')) + device = _normalize_attestation_device(data.get('device')) # IP rate limiting (Security Hardening 2026-02-02) ip_ok, ip_reason = check_ip_rate_limit(client_ip, miner) @@ -1821,12 +2006,8 @@ def submit_attestation(): "message": "Too many unique miners from this IP address", "code": "IP_RATE_LIMIT" }), 429 - signals = data.get('signals', {}) if isinstance(data.get('signals'), dict) else {} - fingerprint = data.get('fingerprint') # FIX #305: None default to detect missing vs empty - - # Basic validation - if not miner: - miner = f"anon_{secrets.token_hex(8)}" + signals = _normalize_attestation_signals(data.get('signals')) + fingerprint = _attest_mapping(data.get('fingerprint')) # NEW: Extract fingerprint # SECURITY: Check blocked wallets with sqlite3.connect(DB_PATH) as conn: @@ -1838,9 +2019,9 @@ def submit_attestation(): # SECURITY: Hardware binding check v2.0 (serial + entropy validation) serial = device.get('serial_number') or device.get('serial') or signals.get('serial') - cores = device.get('cores', 1) - arch = device.get('arch') or device.get('device_arch', 'modern') - macs = signals.get('macs', []) + cores = _attest_positive_int(device.get('cores'), default=1) + arch = _attest_text(device.get('arch')) or _attest_text(device.get('device_arch')) or 'modern' + macs = _attest_string_list(signals.get('macs')) if HW_BINDING_V2 and serial: hw_ok, hw_msg, hw_details = bind_hardware_v2( @@ -1873,7 +2054,6 @@ def submit_attestation(): }), 409 # RIP-0147a: Check OUI gate - macs = signals.get('macs', []) if macs: oui_ok, oui_info = _check_oui_gate(macs) if not oui_ok: diff --git a/tests/attestation_corpus/malformed_miner_array.json b/tests/attestation_corpus/malformed_miner_array.json new file mode 100644 index 00000000..e6f82bfe --- /dev/null +++ b/tests/attestation_corpus/malformed_miner_array.json @@ -0,0 +1,21 @@ +{ + "miner": [ + "not", + "a", + "string" + ], + "device": { + "device_family": "PowerPC", + "device_arch": "power8", + "cores": 4 + }, + "signals": { + "hostname": "miner-array-host", + "macs": [ + "AA:BB:CC:DD:EE:33" + ] + }, + "report": { + "commitment": "miner-array-commitment" + } +} diff --git a/tests/attestation_corpus/malformed_report_scalar.json b/tests/attestation_corpus/malformed_report_scalar.json new file mode 100644 index 00000000..1cc04dc3 --- /dev/null +++ b/tests/attestation_corpus/malformed_report_scalar.json @@ -0,0 +1,15 @@ +{ + "miner": "report-scalar-miner", + "device": { + "device_family": "PowerPC", + "device_arch": "power8", + "cores": 8 + }, + "signals": { + "hostname": "report-scalar-host", + "macs": [ + "AA:BB:CC:DD:EE:44" + ] + }, + "report": "not-a-report-object" +} diff --git a/tests/replay_attestation_corpus.py b/tests/replay_attestation_corpus.py new file mode 100644 index 00000000..e029ed10 --- /dev/null +++ b/tests/replay_attestation_corpus.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 +""" +Replay a saved attestation corpus entry against the Flask test client. +""" + +from __future__ import annotations + +import argparse +import importlib.util +import json +import os +import sqlite3 +import sys +import uuid +from pathlib import Path + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +NODE_PATH = PROJECT_ROOT / "node" / "rustchain_v2_integrated_v2.2.1_rip200.py" +TMP_ROOT = PROJECT_ROOT / "tests" / ".tmp_attestation" + +sys.path.insert(0, str(PROJECT_ROOT)) +sys.path.insert(0, str(PROJECT_ROOT / "node")) + +os.environ.setdefault("RC_ADMIN_KEY", "0" * 32) +os.environ.setdefault("DB_PATH", ":memory:") + +from tests import mock_crypto + +sys.modules["rustchain_crypto"] = mock_crypto + + +def _load_integrated_node(): + if "integrated_node" in sys.modules: + return sys.modules["integrated_node"] + + spec = importlib.util.spec_from_file_location("integrated_node", NODE_PATH) + module = importlib.util.module_from_spec(spec) + sys.modules["integrated_node"] = module + spec.loader.exec_module(module) + return module + + +def _init_attestation_db(db_path: Path) -> None: + conn = sqlite3.connect(db_path) + conn.executescript( + """ + CREATE TABLE blocked_wallets ( + wallet TEXT PRIMARY KEY, + reason TEXT + ); + CREATE TABLE balances ( + miner_pk TEXT PRIMARY KEY, + balance_rtc REAL DEFAULT 0 + ); + CREATE TABLE epoch_enroll ( + epoch INTEGER NOT NULL, + miner_pk TEXT NOT NULL, + weight REAL NOT NULL, + PRIMARY KEY (epoch, miner_pk) + ); + CREATE TABLE miner_header_keys ( + miner_id TEXT PRIMARY KEY, + pubkey_hex TEXT + ); + CREATE TABLE tickets ( + ticket_id TEXT PRIMARY KEY, + expires_at INTEGER NOT NULL, + commitment TEXT + ); + CREATE TABLE oui_deny ( + oui TEXT PRIMARY KEY, + vendor TEXT, + enforce INTEGER DEFAULT 0 + ); + """ + ) + conn.commit() + conn.close() + + +def _apply_test_overrides(module, db_path: Path): + original = { + "DB_PATH": getattr(module, "DB_PATH", None), + "HW_BINDING_V2": getattr(module, "HW_BINDING_V2", None), + "HW_PROOF_AVAILABLE": getattr(module, "HW_PROOF_AVAILABLE", None), + "check_ip_rate_limit": module.check_ip_rate_limit, + "_check_hardware_binding": module._check_hardware_binding, + "record_attestation_success": module.record_attestation_success, + "record_macs": module.record_macs, + "current_slot": module.current_slot, + "slot_to_epoch": module.slot_to_epoch, + } + + module.DB_PATH = str(db_path) + module.HW_BINDING_V2 = False + module.HW_PROOF_AVAILABLE = False + module.check_ip_rate_limit = lambda client_ip, miner_id: (True, "ok") + module._check_hardware_binding = lambda *args, **kwargs: (True, "ok", "") + module.record_attestation_success = lambda *args, **kwargs: None + module.record_macs = lambda *args, **kwargs: None + module.current_slot = lambda: 12345 + module.slot_to_epoch = lambda slot: 85 + module.app.config["TESTING"] = True + return original + + +def _restore_test_overrides(module, original): + for name, value in original.items(): + setattr(module, name, value) + + +def parse_args(): + parser = argparse.ArgumentParser(description="Replay a saved attestation corpus JSON file") + parser.add_argument("corpus_file", type=Path, help="Path to a JSON corpus entry") + return parser.parse_args() + + +def main() -> int: + args = parse_args() + payload_path = args.corpus_file + if not payload_path.exists(): + raise SystemExit(f"Corpus file not found: {payload_path}") + + raw_json = payload_path.read_text(encoding="utf-8") + module = _load_integrated_node() + + TMP_ROOT.mkdir(exist_ok=True) + db_path = TMP_ROOT / f"replay_{uuid.uuid4().hex}.sqlite3" + _init_attestation_db(db_path) + original = _apply_test_overrides(module, db_path) + try: + with module.app.test_client() as client: + response = client.post("/attest/submit", data=raw_json, content_type="application/json") + print( + json.dumps( + { + "corpus_file": str(payload_path), + "status_code": response.status_code, + "response_json": response.get_json(silent=True), + }, + indent=2, + sort_keys=True, + ) + ) + return 0 if response.status_code < 500 else 1 + finally: + _restore_test_overrides(module, original) + if db_path.exists(): + try: + db_path.unlink() + except PermissionError: + pass + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_attestation_fuzz.py b/tests/test_attestation_fuzz.py index d90fb1ab..9a4f247d 100644 --- a/tests/test_attestation_fuzz.py +++ b/tests/test_attestation_fuzz.py @@ -45,6 +45,14 @@ def _init_attestation_db(db_path: Path) -> None: vendor TEXT, enforce INTEGER DEFAULT 0 ); + CREATE TABLE hardware_bindings ( + hardware_id TEXT PRIMARY KEY, + bound_miner TEXT NOT NULL, + device_arch TEXT, + device_model TEXT, + bound_at INTEGER, + attestation_count INTEGER DEFAULT 0 + ); """ ) conn.commit() @@ -84,22 +92,22 @@ def _base_payload() -> dict: } -@pytest.fixture -def client(monkeypatch): +def _client_fixture(monkeypatch, *, strict_security_path=False): local_tmp_dir = Path(__file__).parent / ".tmp_attestation" local_tmp_dir.mkdir(exist_ok=True) db_path = local_tmp_dir / f"{uuid.uuid4().hex}.sqlite3" _init_attestation_db(db_path) monkeypatch.setattr(integrated_node, "DB_PATH", str(db_path)) - monkeypatch.setattr(integrated_node, "HW_BINDING_V2", False, raising=False) - monkeypatch.setattr(integrated_node, "HW_PROOF_AVAILABLE", False, raising=False) monkeypatch.setattr(integrated_node, "check_ip_rate_limit", lambda client_ip, miner_id: (True, "ok")) - monkeypatch.setattr(integrated_node, "_check_hardware_binding", lambda *args, **kwargs: (True, "ok", "")) monkeypatch.setattr(integrated_node, "record_attestation_success", lambda *args, **kwargs: None) monkeypatch.setattr(integrated_node, "record_macs", lambda *args, **kwargs: None) monkeypatch.setattr(integrated_node, "current_slot", lambda: 12345) monkeypatch.setattr(integrated_node, "slot_to_epoch", lambda slot: 85) + monkeypatch.setattr(integrated_node, "HW_BINDING_V2", False, raising=False) + monkeypatch.setattr(integrated_node, "HW_PROOF_AVAILABLE", False, raising=False) + if not strict_security_path: + monkeypatch.setattr(integrated_node, "_check_hardware_binding", lambda *args, **kwargs: (True, "ok", "")) integrated_node.app.config["TESTING"] = True with integrated_node.app.test_client() as test_client: @@ -112,6 +120,16 @@ def client(monkeypatch): pass +@pytest.fixture +def client(monkeypatch): + yield from _client_fixture(monkeypatch, strict_security_path=False) + + +@pytest.fixture +def strict_client(monkeypatch): + yield from _client_fixture(monkeypatch, strict_security_path=True) + + def _post_raw_json(client, raw_json: str): return client.post("/attest/submit", data=raw_json, content_type="application/json") @@ -132,31 +150,99 @@ def test_attest_submit_rejects_non_object_json(client, file_name, expected_statu @pytest.mark.parametrize( - "file_name", + ("file_name", "expected_code"), [ - "malformed_device_scalar.json", - "malformed_signals_scalar.json", - "malformed_signals_macs_object.json", - "malformed_fingerprint_checks_array.json", + ("malformed_device_scalar.json", "INVALID_DEVICE"), + ("malformed_miner_array.json", "INVALID_MINER"), + ("malformed_signals_scalar.json", "INVALID_SIGNALS"), + ("malformed_signals_macs_object.json", "INVALID_SIGNALS_MACS"), + ("malformed_fingerprint_checks_array.json", "INVALID_FINGERPRINT_CHECKS"), + ("malformed_report_scalar.json", "INVALID_REPORT"), ], ) -def test_attest_submit_corpus_cases_do_not_raise_server_errors(client, file_name): +def test_attest_submit_rejects_malformed_payload_shapes(client, file_name, expected_code): response = _post_raw_json(client, (CORPUS_DIR / file_name).read_text(encoding="utf-8")) - assert response.status_code < 500 - assert response.get_json()["ok"] is True + assert response.status_code in (400, 422) + assert response.get_json()["ok"] is False + assert response.get_json()["code"] == expected_code + + +@pytest.mark.parametrize( + ("payload", "expected_code"), + [ + ({"miner": "", "device": {"cores": 8}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "MISSING_MINER"), + ({"miner": " ", "device": {"cores": 8}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "MISSING_MINER"), + ({"miner": "fuzz\u200bminer", "device": {"cores": 8}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "INVALID_MINER"), + ({"miner": "'; DROP TABLE balances; --", "device": {"cores": 8}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "INVALID_MINER"), + ({"miner": "f" * 129, "device": {"cores": 8}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "INVALID_MINER"), + ({"miner": "fuzz-miner", "device": {"cores": "999999999999999999999999"}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "INVALID_DEVICE_CORES"), + ({"miner": "fuzz-miner", "device": {"cores": []}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "INVALID_DEVICE_CORES"), + ({"miner": "fuzz-miner", "device": {"cores": 8}, "signals": {"macs": ["AA:BB:CC:DD:EE:10", None]}, "report": {}}, "INVALID_SIGNALS_MACS"), + ({"miner": "fuzz-miner", "device": {"cores": 8, "cpu": ["nested"]}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "INVALID_DEVICE"), + ({"miner": "fuzz-miner", "device": {"cores": 8}, "signals": {"hostname": ["nested"], "macs": ["AA:BB:CC:DD:EE:10"]}, "report": {}}, "INVALID_SIGNALS"), + ({"miner": "fuzz-miner", "device": {"cores": 8}, "signals": {"macs": ["AA:BB:CC:DD:EE:10"]}, "report": {"nonce": {"nested": "bad"}}}, "INVALID_REPORT"), + ], +) +def test_attest_submit_rejects_attack_vector_shapes(client, payload, expected_code): + response = client.post("/attest/submit", json=payload) + + assert response.status_code in (400, 422) + assert response.get_json()["ok"] is False + assert response.get_json()["code"] == expected_code + + +def test_attest_submit_sql_like_miner_does_not_mutate_schema(client): + payload = _base_payload() + payload["miner"] = "'; DROP TABLE balances; --" + + response = client.post("/attest/submit", json=payload) + + assert response.status_code == 400 + assert response.get_json()["code"] == "INVALID_MINER" + + +def test_validate_fingerprint_data_rejects_non_dict_input(): + passed, reason = integrated_node.validate_fingerprint_data(["not", "a", "dict"]) + + assert passed is False + assert reason == "fingerprint_not_dict" + + +def test_attest_submit_strict_fixture_rejects_malformed_fingerprint(strict_client): + payload = _base_payload() + payload["fingerprint"]["checks"] = [] + + response = strict_client.post("/attest/submit", json=payload) + + assert response.status_code == 400 + assert response.get_json()["ok"] is False + assert response.get_json()["code"] == "INVALID_FINGERPRINT_CHECKS" + + +def test_attest_submit_strict_fixture_enforces_hardware_binding(strict_client): + first = _base_payload() + second = _base_payload() + second["miner"] = "different-miner" + + first_response = strict_client.post("/attest/submit", json=first) + second_response = strict_client.post("/attest/submit", json=second) + + assert first_response.status_code == 200 + assert second_response.status_code == 409 + assert second_response.get_json()["code"] == "DUPLICATE_HARDWARE" def _mutate_payload(rng: random.Random) -> dict: payload = _base_payload() - mutation = rng.randrange(8) + mutation = rng.randrange(14) if mutation == 0: payload["miner"] = ["not", "a", "string"] elif mutation == 1: payload["device"] = "not-a-device-object" elif mutation == 2: - payload["device"]["cores"] = rng.choice([0, -1, "NaN", [], {}]) + payload["device"]["cores"] = rng.choice([0, -1, "NaN", [], {}, "999999999999999999999999"]) elif mutation == 3: payload["signals"] = "not-a-signals-object" elif mutation == 4: @@ -171,16 +257,29 @@ def _mutate_payload(rng: random.Random) -> dict: payload["report"] = rng.choice(["not-a-report-object", [], {"commitment": ["bad"]}]) elif mutation == 6: payload["fingerprint"] = {"checks": rng.choice([[], "bad", {"anti_emulation": True}])} - else: + elif mutation == 7: payload["device"]["cpu"] = rng.choice(["qemu-system-ppc", "IBM POWER8", None, ["nested"]]) payload["signals"]["hostname"] = rng.choice(["vmware-host", "power8-host", None, ["nested"]]) + elif mutation == 8: + payload["miner"] = rng.choice(["", " ", "\t", "fuzz\u200bminer", "'; DROP TABLE balances; --"]) + elif mutation == 9: + payload["miner"] = "f" * 300 + elif mutation == 10: + payload["device"]["device_family"] = {"nested": {"too": "deep"}} + elif mutation == 11: + payload["signals"]["macs"] = ["AA:BB:CC:DD:EE:10", None] + elif mutation == 12: + payload["fingerprint"] = ["bad", "shape"] + else: + payload["report"]["nonce"] = {"nested": "bad"} return payload -def test_attest_submit_fuzz_no_unhandled_exceptions(client): +def test_attest_submit_mutation_regression_no_unhandled_exceptions(client): cases = int(os.getenv("ATTEST_FUZZ_CASES", "250")) - rng = random.Random(475) + seed = os.getenv("ATTEST_FUZZ_SEED") + rng = random.Random(int(seed)) if seed else random.Random() for index in range(cases): payload = _mutate_payload(rng)