Skip to content

Commit

Permalink
Verify podman/docker images against locally stored signatures
Browse files Browse the repository at this point in the history
  • Loading branch information
almet committed Jan 28, 2025
1 parent 236004c commit 59a93c6
Showing 1 changed file with 187 additions and 55 deletions.
242 changes: 187 additions & 55 deletions dev_scripts/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,24 @@
import shutil
import subprocess
from base64 import b64decode
from pathlib import Path
from tempfile import NamedTemporaryFile

import click
import requests

try:
import platformdirs
except ImportError:
import appdirs as platformdirs


def get_config_dir() -> str:
return Path(platformdirs.user_config_dir("dangerzone"))


SIGNATURES_PATH = get_config_dir() / "signatures"

DEFAULT_REPO = "freedomofpress/dangerzone"
SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json"
DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json"
Expand Down Expand Up @@ -190,13 +203,37 @@ def verify_attestation(
return True


def new_version_available():
def new_image_release():
# XXX - Implement
return True


def check_signature(signature_bundle, pub_key):
"""Ensure that the signature bundle has been signed by the given public key."""
def signature_to_bundle(sig):
# Convert cosign-download signatures to the format expected by cosign bundle.
bundle = sig["Bundle"]
payload = bundle["Payload"]
return {
"base64Signature": sig["Base64Signature"],
"Payload": sig["Payload"],
"cert": sig["Cert"],
"chain": sig["Chain"],
"rekorBundle": {
"SignedEntryTimestamp": bundle["SignedEntryTimestamp"],
"Payload": {
"body": payload["body"],
"integratedTime": payload["integratedTime"],
"logIndex": payload["logIndex"],
"logID": payload["logID"],
},
},
"RFC3161Timestamp": sig["RFC3161Timestamp"],
}


def verify_signature(signature, pubkey):
"""Verify a signature against a given public key"""

signature_bundle = signature_to_bundle(signature)

# Put the value in files and verify with cosign
with (
Expand All @@ -213,13 +250,14 @@ def check_signature(signature_bundle, pub_key):
"cosign",
"verify-blob",
"--key",
pub_key,
pubkey,
"--bundle",
signature_file.name,
payload_file.name,
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
# XXX Raise instead?
return False
return result.stderr == b"Verified OK\n"

Expand All @@ -236,86 +274,171 @@ def container_pull(image):
process.communicate()


def upgrade_container_image(image, tag, pub_key, registry: RegistryClient):
if not new_version_available():
def upgrade_container_image(image, tag, pubkey, registry: RegistryClient):
if not new_image_release():
return

hash = registry.get_manifest_hash(tag)
signatures = get_signatures(image, hash)

if len(signatures) < 1:
raise Exception("Unable to retrieve signatures")

print(f"Found {len(signatures)} signature(s) for {image}")
for signature in signatures:
signature_is_valid = check_signature(signature, pub_key)
signature_is_valid = verify_signature(signature, pubkey)
if not signature_is_valid:
raise Exception("Unable to verify signature")
print("✅ Signature is valid")

# At this point, the signature is verified, let's upgrade
# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
store_signatures(signatures, hash, pubkey)

# let's upgrade the image
# XXX Use the hash here to avoid race conditions
container_pull(image)


def get_file_hash(file):
with open(file, "rb") as f:
content = f.read()
return hashlib.sha256(content).hexdigest()


def load_signatures(image_hash, pubkey):
pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
if not pubkey_signatures.exists():
msg = (
f"Cannot find a '{pubkey_signatures}' folder."
"You might need to download the image signatures first."
)
raise Exception(msg)

with open(pubkey_signatures / f"{image_hash}.json") as f:
return json.load(f)


def store_signatures(signatures, image_hash, pubkey):
"""
Store signatures locally in the SIGNATURE_PATH folder, like this:
~/.config/dangerzone/signatures/
└── <pubkey-hash>
└── <image-hash>.json
└── <image-hash>.json
The format used in the `.json` file is the one of `cosign download
signature`, which differs from the "bundle" one used afterwards.
It can be converted to the one expected by cosign verify --bundle with
the `signature_to_bundle()` function.
"""

def _get_digest(sig):
payload = json.loads(b64decode(sig["Payload"]))
return payload["critical"]["image"]["docker-manifest-digest"]

# All the signatures should share the same hash.
hashes = list(map(_get_digest, signatures))
if len(set(hashes)) != 1:
raise Exception("Signatures do not share the same image hash")

if f"sha256:{image_hash}" != hashes[0]:
raise Exception("Signatures do not match the given image hash")

pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
pubkey_signatures.mkdir(exist_ok=True)

with open(pubkey_signatures / f"{image_hash}.json", "w") as f:
json.dump(signatures, f)


def verify_local_image_signature(image, pubkey):
"""
Verifies that a local image has a valid signature
"""
image_hash = get_image_hash(image)
signatures = load_signatures(image_hash, pubkey)
if len(signatures) < 1:
raise Exception("No signatures found")

for signature in signatures:
if not verify_signature(signature, pubkey):
msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
raise Exception(msg)
return True


def get_image_hash(image):
"""
Returns a image hash from a local image name
"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.strip().decode().strip("sha256:")


def get_signatures(image, hash):
"""
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.
"""

def _to_bundle(sig):
# Convert cosign-download signatures to the format expected by cosign bundle.
bundle = sig["Bundle"]
payload = bundle["Payload"]
return {
"base64Signature": sig["Base64Signature"],
"Payload": sig["Payload"],
"cert": sig["Cert"],
"chain": sig["Chain"],
"rekorBundle": {
"SignedEntryTimestamp": bundle["SignedEntryTimestamp"],
"Payload": {
"body": payload["body"],
"integratedTime": payload["integratedTime"],
"logIndex": payload["logIndex"],
"logID": payload["logID"],
},
},
"RFC3161Timestamp": sig["RFC3161Timestamp"],
}

process = subprocess.run(
["cosign", "download", "signature", f"{image}@sha256:{hash}"],
capture_output=True,
check=True,
)

# XXX: Check the output first.
# Remove the last return, split on newlines, convert from JSON
signatures_raw = process.stdout.decode("utf-8").strip().split("\n")
return list(map(json.loads, signatures_raw))

# Remove the last return, split on newlines, convert from JSON
return [_to_bundle(json.loads(sig)) for sig in signatures_raw]


def parse_image_location(input_string):
"""Parses container image location into (registry, namespace, repository, tag)"""
pattern = (
r"^"
r"(?P<registry>[a-zA-Z0-9.-]+)/"
r"(?P<namespace>[a-zA-Z0-9-]+)/"
r"(?P<repository>[^:]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"$"
)
match = re.match(pattern, input_string)
if not match:
raise ValueError("Malformed image location")

return (
match.group("registry"),
match.group("namespace"),
match.group("repository"),
match.group("tag") or "latest",
)

class Image:
def __init__(self, registry, namespace, repository, tag="latest"):
self.registry = registry
self.namespace = namespace
self.repository = repository
self.tag = tag

def properties(self):
return (self.registry, self.namespace, self.repository, self.tag)

@property
def name_without_tag(self):
return f"{self.registry}/{self.namespace}/{self.repository}"

@property
def name_with_tag(self):
return f"{self.name_without_tag}:{self.tag}"

@classmethod
def from_string(cls, input_string):
"""Parses container image location into (registry, namespace, repository, tag)"""
pattern = (
r"^"
r"(?P<registry>[a-zA-Z0-9.-]+)/"
r"(?P<namespace>[a-zA-Z0-9-]+)/"
r"(?P<repository>[^:]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"$"
)
match = re.match(pattern, input_string)
if not match:
raise ValueError("Malformed image location")

return cls(
match.group("registry"),
match.group("namespace"),
match.group("repository"),
match.group("tag") or "latest",
)


def parse_image_location(string):
return Image.from_string(string).properties


@click.group()
Expand All @@ -327,12 +450,21 @@ def main():
@click.argument("image")
@click.option("--pubkey", default="pub.key")
def upgrade_image(image, pubkey):
registry, org, package, tag = parse_image_location(image)
registry_client = RegistryClient(registry, org, package)
registry, namespace, repository, tag = parse_image_location(image)
registry_client = RegistryClient(registry, namespace, repository)

upgrade_container_image(image, tag, pubkey, registry_client)


@main.command()
@click.argument("image")
@click.option("--pubkey", default="pub.key")
def verify_local_image(image, pubkey):
# XXX remove a potentiel :tag
if verify_local_image_signature(image, pubkey):
click.echo(f"✅ The local image {image} has been signed with {pubkey}")


@main.command()
@click.argument("image")
def list_tags(image):
Expand Down

0 comments on commit 59a93c6

Please sign in to comment.