Skip to content

Commit

Permalink
Refactoring of dangerzone/updater/*
Browse files Browse the repository at this point in the history
  • Loading branch information
almet committed Jan 29, 2025
1 parent d0ab34b commit fd1db71
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 82 deletions.
4 changes: 4 additions & 0 deletions dangerzone/updater/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import logging

log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
10 changes: 5 additions & 5 deletions dangerzone/updater/attestations.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import subprocess
from tempfile import NamedTemporaryFile

from .utils import write


def verify_attestation(
manifest: bytes, attestation_bundle: bytes, image_tag: str, expected_repo: str
):
) -> bool:
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
Expand All @@ -17,8 +15,10 @@ def verify_attestation(
NamedTemporaryFile(mode="wb") as manifest_json,
NamedTemporaryFile(mode="wb") as attestation_bundle_json,
):
write(manifest_json, manifest)
write(attestation_bundle_json, attestation_bundle)
manifest_json.write(manifest)
manifest_json.flush()
attestation_bundle_json.write(attestation_bundle)
attestation_bundle_json.flush()

# Call cosign with the temporary file paths
cmd = [
Expand Down
36 changes: 17 additions & 19 deletions dangerzone/updater/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
from .attestations import verify_attestation
from .signatures import upgrade_container_image, verify_offline_image_signature

DEFAULT_REPO = "freedomofpress/dangerzone"
DEFAULT_REPOSITORY = "freedomofpress/dangerzone"


@click.group()
def main():
def main() -> None:
pass


@main.command()
@click.argument("image")
@click.option("--image")
@click.option("--pubkey", default="pub.key")
@click.option("--airgap", is_flag=True)
# XXX Add options to do airgap upgrade
def upgrade(image, pubkey):
def upgrade(image: str, pubkey: str) -> None:
manifest_hash = registry.get_manifest_hash(image)
if upgrade_container_image(image, manifest_hash, pubkey):
click.echo(f"✅ The local image {image} has been upgraded")
Expand All @@ -27,9 +28,9 @@ def upgrade(image, pubkey):
@main.command()
@click.argument("image")
@click.option("--pubkey", default="pub.key")
def verify_local(image, pubkey):
def verify_local(image: str, pubkey: str) -> None:
"""
XXX document
Verify the local image signature against a public key and the stored signatures.
"""
# XXX remove a potentiel :tag
if verify_offline_image_signature(image, pubkey):
Expand All @@ -38,28 +39,26 @@ def verify_local(image, pubkey):

@main.command()
@click.argument("image")
def list_tags(image):
click.echo(f"Existing tags for {client.image}")
def list_remote_tags(image: str) -> None:
click.echo(f"Existing tags for {image}")
for tag in registry.list_tags(image):
click.echo(tag)


@main.command()
@click.argument("image")
@click.argument("tag")
def get_manifest(image, tag):
click.echo(registry.get_manifest(image, tag))
def get_manifest(image: str) -> None:
click.echo(registry.get_manifest(image))


@main.command()
@click.argument("image")
@click.option(
"--repo",
default=DEFAULT_REPO,
"--repository",
default=DEFAULT_REPOSITORY,
help="The github repository to check the attestation for",
)
# XXX use a consistent naming for these cli commands
def attest(image: str, repo: str):
def attest_provenance(image: str, repository: str) -> None:
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
Expand All @@ -68,14 +67,13 @@ def attest(image: str, repo: str):
# if shutil.which("cosign") is None:
# click.echo("The cosign binary is needed but not installed.")
# raise click.Abort()
# XXX: refactor parse_image_location to return a dict.
_, _, _, image_tag = registry.parse_image_location(image)
parsed = registry.parse_image_location(image)
manifest, bundle = registry.get_attestation(image)

verified = verify_attestation(manifest, bundle, image_tag, repo)
verified = verify_attestation(manifest, bundle, parsed.tag, repository)
if verified:
click.echo(
f"🎉 The image available at `{client.image}:{image_tag}` has been built by Github Runners from the `{repo}` repository"
f"🎉 The image available at `{parsed.full_name}` has been built by Github Runners from the `{repository}` repository"
)


Expand Down
108 changes: 66 additions & 42 deletions dangerzone/updater/registry.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import hashlib
import re
from collections import namedtuple
from typing import Dict, Optional, Tuple

import requests

from . import log

__all__ = [
"get_manifest_hash",
"list_tags",
"get_manifest",
"get_attestation",
]

SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json"
Expand All @@ -15,40 +20,51 @@
OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json"


def parse_image_location(input_string: str) -> Tuple[str, str, str, str]:
"""Parses container image location into (registry, namespace, repository, tag)"""
class Image(namedtuple("Image", ["registry", "namespace", "image_name", "tag"])):
__slots__ = ()

@property
def full_name(self) -> str:
tag = f":{self.tag}" if self.tag else ""
return f"{self.registry}/{self.namespace}/{self.image_name}{tag}"


def parse_image_location(input_string: str) -> Image:
"""Parses container image location into an Image namedtuple"""
pattern = (
r"^"
r"(?P<registry>[a-zA-Z0-9.-]+)/"
r"(?P<namespace>[a-zA-Z0-9-]+)/"
r"(?P<repository>[^:]+)"
r"(?P<image_name>[^:]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"$"
)
match = re.match(pattern, input_string)
if not match:
raise ValueError("Malformed image location")

return (
match.group("registry"),
match.group("namespace"),
match.group("repository"),
match.group("tag") or "latest",
return Image(
registry=match.group("registry"),
namespace=match.group("namespace"),
image_name=match.group("image_name"),
tag=match.group("tag") or "latest",
)


class RegistryClient:
def __init__(self, registry: str, org: str, image: str):
self._registry = registry
self._org = org
def __init__(
self,
image: Image | str,
):
if isinstance(image, str):
image = parse_image_location(image)

self._image = image
self._registry = image.registry
self._namespace = image.namespace
self._image_name = image.image_name
self._auth_token = None
self._base_url = f"https://{registry}"
self._image_url = f"{self._base_url}/v2/{self._org}/{self._image}"

@property
def image(self):
return f"{self._registry}/{self._org}/{self._image}"
self._base_url = f"https://{self._registry}"
self._image_url = f"{self._base_url}/v2/{self._namespace}/{self._image_name}"

def get_auth_token(self) -> Optional[str]:
if not self._auth_token:
Expand All @@ -57,7 +73,7 @@ def get_auth_token(self) -> Optional[str]:
auth_url,
params={
"service": f"{self._registry}",
"scope": f"repository:{self._org}/{self._image}:pull",
"scope": f"repository:{self._namespace}/{self._image_name}:pull",
},
)
response.raise_for_status()
Expand All @@ -74,7 +90,9 @@ def list_tags(self) -> list:
tags = response.json().get("tags", [])
return tags

def get_manifest(self, tag, extra_headers=None) -> requests.Response:
def get_manifest(
self, tag: str, extra_headers: Optional[dict] = None
) -> requests.Response:
"""Get manifest information for a specific tag"""
manifest_url = f"{self._image_url}/manifests/{tag}"
headers = {
Expand All @@ -88,7 +106,7 @@ def get_manifest(self, tag, extra_headers=None) -> requests.Response:
response.raise_for_status()
return response

def list_manifests(self, tag) -> list:
def list_manifests(self, tag: str) -> list:
return (
self.get_manifest(
tag,
Expand All @@ -100,7 +118,7 @@ def list_manifests(self, tag) -> list:
.get("manifests")
)

def get_blob(self, hash) -> requests.Response:
def get_blob(self, hash: str) -> requests.Response:
url = f"{self._image_url}/blobs/{hash}"
response = requests.get(
url,
Expand All @@ -111,13 +129,15 @@ def get_blob(self, hash) -> requests.Response:
response.raise_for_status()
return response

def get_manifest_hash(self, tag, tag_manifest_content=None) -> str:
def get_manifest_hash(
self, tag: str, tag_manifest_content: Optional[bytes] = None
) -> str:
if not tag_manifest_content:
tag_manifest_content = self.get_manifest(tag).content

return hashlib.sha256(tag_manifest_content).hexdigest()

def get_attestation(self, tag) -> Tuple[bytes, bytes]:
def get_attestation(self, tag: str) -> Tuple[bytes, bytes]:
"""
Retrieve an attestation from a given tag.
Expand All @@ -129,15 +149,20 @@ def get_attestation(self, tag) -> Tuple[bytes, bytes]:
Returns a tuple with the tag manifest content and the bundle content.
"""

def _find_sigstore_bundle_manifest(manifests):
# FIXME: do not only rely on the first layer
def _find_sigstore_bundle_manifest(
manifests: list,
) -> Tuple[Optional[str], Optional[str]]:
for manifest in manifests:
if manifest["artifactType"] == SIGSTORE_BUNDLE:
return manifest["mediaType"], manifest["digest"]
return None, None

def _get_bundle_blob_digest(layers):
def _get_bundle_blob_digest(layers: list) -> Optional[str]:
for layer in layers:
if layer.get("mediaType") == SIGSTORE_BUNDLE:
return layer["digest"]
return None

tag_manifest_content = self.get_manifest(tag).content

Expand All @@ -164,30 +189,29 @@ def _get_bundle_blob_digest(layers):
layers = bundle_manifest.get("layers", [])

blob_digest = _get_bundle_blob_digest(layers)
log.info(f"Found sigstore bundle blob digest: {blob_digest}")
if not blob_digest:
raise Exception("Not able to find sigstore bundle blob info")
bundle = self.get_blob(blob_digest)
return tag_manifest_content, bundle.content


def get_manifest_hash(image: str) -> str:
registry, org, package, tag = parse_image_location(image)
client = RegistryClient(registry, org, package)
return client.get_manifest_hash(tag)
def get_manifest_hash(image_str: str) -> str:
image = parse_image_location(image_str)
return RegistryClient(image).get_manifest_hash(image.tag)


def list_tags(image: str) -> list:
registry, org, package, _ = parse_image_location(image)
client = RegistryClient(registry, org, package)
return client.list_tags()
def list_tags(image_str: str) -> list:
return RegistryClient(image_str).list_tags()


def get_manifest(image: str, tag: str) -> bytes:
registry, org, package, _ = parse_image_location(image)
client = RegistryClient(registry, org, package)
resp = client.get_manifest(tag, extra_headers={"Accept": OCI_IMAGE_MANIFEST})
def get_manifest(image_str: str) -> bytes:
image = parse_image_location(image_str)
client = RegistryClient(image)
resp = client.get_manifest(image.tag, extra_headers={"Accept": OCI_IMAGE_MANIFEST})
return resp.content


def get_attestation(image: str) -> Tuple[bytes, bytes]:
registry, org, package, tag = parse_image_location(image)
client = RegistryClient(registry, org, package)
return client.get_attestation(tag)
def get_attestation(image_str: str) -> Tuple[bytes, bytes]:
image = parse_image_location(image_str)
return RegistryClient(image).get_attestation(image.tag)
Loading

0 comments on commit fd1db71

Please sign in to comment.