Skip to content

Commit

Permalink
Some more refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
almet committed Jan 29, 2025
1 parent fd1db71 commit 7991a5c
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 57 deletions.
23 changes: 18 additions & 5 deletions dangerzone/container_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@

def get_runtime_name() -> str:
if platform.system() == "Linux":
runtime_name = "podman"
else:
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
runtime_name = "docker"
return runtime_name
return "podman"
# Windows, Darwin, and unknown use docker for now, dangerzone-vm eventually
return "docker"


def get_runtime_version() -> Tuple[int, int]:
Expand Down Expand Up @@ -147,3 +145,18 @@ def load_image_tarball() -> None:
)

log.info("Successfully installed container image from")


def container_pull(image: str) -> bool:
"""Pull a container image from a registry."""
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
return process.returncode == 0


def load_image_hash(image: str) -> str:
"""Returns a image hash from a local image name"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.strip().decode().strip("sha256:")
1 change: 0 additions & 1 deletion dangerzone/updater/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import logging

log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
41 changes: 32 additions & 9 deletions dangerzone/updater/cli.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,63 @@
#!/usr/bin/python

import logging

import click

from . import registry
from ..util import get_resource_path
from . import errors, log, registry
from .attestations import verify_attestation
from .signatures import upgrade_container_image, verify_offline_image_signature

DEFAULT_REPOSITORY = "freedomofpress/dangerzone"

PUBKEY_DEFAULT_LOCATION = get_resource_path("freedomofpress-dangerzone-pub.key")


@click.group()
def main() -> None:
pass
@click.option("--debug", is_flag=True)
def main(debug=False) -> None:
if debug:
click.echo("Debug mode enabled")
level = logging.DEBUG
else:
level = logging.INFO
logging.basicConfig(level=level)


@main.command()
@click.option("--image")
@click.option("--pubkey", default="pub.key")
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
@click.option("--airgap", is_flag=True)
# XXX Add options to do airgap upgrade
def upgrade(image: str, pubkey: str) -> None:
def upgrade(image: str, pubkey: str, airgap: bool) -> None:
"""Upgrade the image to the latest signed version."""
manifest_hash = registry.get_manifest_hash(image)
if upgrade_container_image(image, manifest_hash, pubkey):
try:
is_upgraded = upgrade_container_image(image, manifest_hash, pubkey)
click.echo(f"✅ The local image {image} has been upgraded")
except errors.ImageAlreadyUpToDate as e:
click.echo(f"✅ {e}")
raise click.Abort()


@main.command()
@click.argument("image")
@click.option("--pubkey", default="pub.key")
def verify_local(image: str, pubkey: str) -> None:
@click.option("--pubkey", default=PUBKEY_DEFAULT_LOCATION)
def verify_offline(image: str, pubkey: str) -> None:
"""
Verify the local image signature against a public key and the stored signatures.
"""
# XXX remove a potentiel :tag
if verify_offline_image_signature(image, pubkey):
click.echo(f"✅ The local image {image} has been signed with {pubkey}")
click.echo(
(
f"Verifying the local image:\n\n"
f"pubkey: {pubkey}\n"
f"image: {image}\n\n"
f"✅ The local image {image} has been signed with {pubkey}"
)
)


@main.command()
Expand Down
38 changes: 38 additions & 0 deletions dangerzone/updater/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
class UpdaterError(Exception):
pass


class ImageAlreadyUpToDate(UpdaterError):
pass


class SignatureError(UpdaterError):
pass


class RegistryError(UpdaterError):
pass


class NoRemoteSignatures(SignatureError):
pass


class SignatureVerificationError(SignatureError):
pass


class SignaturesFolderDoesNotExist(SignatureError):
pass


class InvalidSignatures(SignatureError):
pass


class SignatureMismatch(SignatureError):
pass


class LocalSignatureNotFound(SignatureError):
pass
6 changes: 3 additions & 3 deletions dangerzone/updater/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import requests

from . import log
from . import errors, log

__all__ = [
"get_manifest_hash",
Expand Down Expand Up @@ -178,7 +178,7 @@ def _get_bundle_blob_digest(layers: list) -> Optional[str]:
_find_sigstore_bundle_manifest(manifests)
)
if not bundle_manifest_digest:
raise Exception("Not able to find sigstore bundle manifest info")
raise errors.RegistryError("Not able to find sigstore bundle manifest info")

bundle_manifest = self.get_manifest(
bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype}
Expand All @@ -191,7 +191,7 @@ def _get_bundle_blob_digest(layers: list) -> Optional[str]:
blob_digest = _get_bundle_blob_digest(layers)
log.info(f"Found sigstore bundle blob digest: {blob_digest}")
if not blob_digest:
raise Exception("Not able to find sigstore bundle blob info")
raise errors.RegistryError("Not able to find sigstore bundle blob info")
bundle = self.get_blob(blob_digest)
return tag_manifest_content, bundle.content

Expand Down
79 changes: 41 additions & 38 deletions dangerzone/updater/signatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
from tempfile import NamedTemporaryFile
from typing import Dict, List, Tuple

from ..container_utils import container_pull, load_image_hash
from . import errors, log
from .registry import get_manifest_hash

try:
import platformdirs
except ImportError:
Expand All @@ -24,10 +28,18 @@ def get_config_dir() -> Path:
"verify_signature",
"load_signatures",
"store_signatures",
"verify_local_image_signature",
"verify_offline_image_signature",
]


def is_cosign_installed() -> bool:
try:
subprocess.run(["cosign", "version"], capture_output=True, check=True)
return True
except subprocess.CalledProcessError:
return False


def signature_to_bundle(sig: Dict) -> Dict:
"""Convert a cosign-download signature to the format expected by cosign bundle."""
bundle = sig["Bundle"]
Expand Down Expand Up @@ -55,7 +67,6 @@ def verify_signature(signature: dict, pubkey: str) -> bool:

signature_bundle = signature_to_bundle(signature)

# Put the value in files and verify with cosign
with (
NamedTemporaryFile(mode="w") as signature_file,
NamedTemporaryFile(mode="bw") as payload_file,
Expand All @@ -76,50 +87,45 @@ def verify_signature(signature: dict, pubkey: str) -> bool:
signature_file.name,
payload_file.name,
]
log.debug(" ".join(cmd))
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
# XXX Raise instead?
log.debug("Failed to verify signature", result.stderr)
return False
return result.stderr == b"Verified OK\n"


def get_runtime_name() -> str:
if platform.system() == "Linux":
return "podman"
return "docker"

if result.stderr == b"Verified OK\n":
log.debug("Signature verified")
return True
return False

def container_pull(image: str) -> bool:
# XXX - Move to container_utils.py
cmd = [get_runtime_name(), "pull", f"{image}"]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
process.communicate()
return process.returncode == 0


def new_image_release() -> bool:
# XXX - Implement
return True
def new_image_release(image) -> bool:
remote_hash = get_manifest_hash(image)
local_hash = load_image_hash(image)
log.debug("Remote hash: %s", remote_hash)
log.debug("Local hash: %s", local_hash)
return remote_hash != local_hash


def upgrade_container_image(
image: str,
manifest_hash: str,
pubkey: str,
) -> bool:
if not new_image_release():
if not new_image_release(image):
raise errors.ImageAlreadyUpToDate("The image is already up to date")
return False

# manifest_hash = registry.get_manifest_hash(tag)
signatures = get_signatures(image, manifest_hash)
log.debug("Signatures: %s", signatures)

if len(signatures) < 1:
raise Exception("Unable to retrieve signatures")
raise errors.NoRemoteSignatures("No remote signatures found")

for signature in signatures:
signature_is_valid = verify_signature(signature, pubkey)
if not signature_is_valid:
raise Exception("Unable to verify signature")
raise errors.SignatureVerificationError()

# At this point, the signatures are verified
# We store the signatures just now to avoid storing unverified signatures
Expand Down Expand Up @@ -148,9 +154,10 @@ def load_signatures(image_hash: str, pubkey: str) -> List[Dict]:
f"Cannot find a '{pubkey_signatures}' folder."
"You might need to download the image signatures first."
)
raise Exception(msg)
raise errors.SignaturesFolderDoesNotExist(msg)

with open(pubkey_signatures / f"{image_hash}.json") as f:
log.debug("Loading signatures from %s", f.name)
return json.load(f)


Expand All @@ -177,43 +184,39 @@ def _get_digest(sig: Dict) -> str:
# All the signatures should share the same hash.
hashes = list(map(_get_digest, signatures))
if len(set(hashes)) != 1:
raise Exception("Signatures do not share the same image hash")
raise errors.InvalidSignatures("Signatures do not share the same image hash")

if f"sha256:{image_hash}" != hashes[0]:
raise Exception("Signatures do not match the given image hash")
raise errors.SignatureMismatch("Signatures do not match the given image hash")

pubkey_signatures = SIGNATURES_PATH / get_file_hash(pubkey)
pubkey_signatures.mkdir(exist_ok=True)

with open(pubkey_signatures / f"{image_hash}.json", "w") as f:
log.debug(
f"Storing signatures for {image_hash} in {pubkey_signatures}/{image_hash}.json"
)
json.dump(signatures, f)


def verify_offline_image_signature(image: str, pubkey: str) -> bool:
"""
Verifies that a local image has a valid signature
"""
log.info(f"Verifying local image {image} against pubkey {pubkey}")
image_hash = load_image_hash(image)
log.debug(f"Image hash: {image_hash}")
signatures = load_signatures(image_hash, pubkey)
if len(signatures) < 1:
raise Exception("No signatures found")
raise errors.LocalSignatureNotFound("No signatures found")

for signature in signatures:
if not verify_signature(signature, pubkey):
msg = f"Unable to verify signature for {image} with pubkey {pubkey}"
raise Exception(msg)
raise errors.SignatureVerificationError(msg)
return True


def load_image_hash(image: str) -> str:
"""
Returns a image hash from a local image name
"""
cmd = [get_runtime_name(), "image", "inspect", image, "-f", "{{.Digest}}"]
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.strip().decode().strip("sha256:")


def get_signatures(image: str, hash: str) -> List[Dict]:
"""
Retrieve the signatures from cosign download signature and convert each one to the "cosign bundle" format.
Expand Down
2 changes: 1 addition & 1 deletion dangerzone/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
try:
import platformdirs
except ImportError:
import appdirs as platformdirs
import appdirs as platformdirs # type: ignore[no-redef]


def get_config_dir() -> str:
Expand Down
1 change: 1 addition & 0 deletions dev_scripts/dangerzone-image

0 comments on commit 7991a5c

Please sign in to comment.