From 96347e2e10cef721d41e7403cad10cf6b12cd6c6 Mon Sep 17 00:00:00 2001 From: AmozPay Date: Thu, 6 Oct 2022 10:23:14 +0200 Subject: [PATCH 1/9] refacto: commands were cluttered + no cli help Solution: put all commands in a sub folder and use typer subcommands Use typer.Option and typer.Argument to document each command parameter --- src/aleph_client/__main__.py | 592 +--------------------- src/aleph_client/commands/aggregate.py | 40 ++ src/aleph_client/commands/files.py | 100 ++++ src/aleph_client/commands/help_strings.py | 5 + src/aleph_client/commands/message.py | 192 +++++++ src/aleph_client/commands/program.py | 263 ++++++++++ src/aleph_client/commands/utils.py | 69 +++ 7 files changed, 689 insertions(+), 572 deletions(-) create mode 100644 src/aleph_client/commands/aggregate.py create mode 100644 src/aleph_client/commands/files.py create mode 100644 src/aleph_client/commands/help_strings.py create mode 100644 src/aleph_client/commands/message.py create mode 100644 src/aleph_client/commands/program.py create mode 100644 src/aleph_client/commands/utils.py diff --git a/src/aleph_client/__main__.py b/src/aleph_client/__main__.py index 9506065b..6711339e 100644 --- a/src/aleph_client/__main__.py +++ b/src/aleph_client/__main__.py @@ -1,595 +1,43 @@ -"""Aleph Client command-line interface. """ -import asyncio -import json -import logging -import os.path -import subprocess -import tempfile -from base64 import b32encode, b16decode -from enum import Enum +Aleph Client command-line interface. +""" + +from typing import Optional from pathlib import Path -from typing import Optional, Dict, List -from zipfile import BadZipFile import typer -from aleph_message.models import ( - ProgramMessage, - StoreMessage, - MessageType, - PostMessage, - ForgetMessage, - AlephMessage, - MessagesResponse, - ProgramContent, -) -from typer import echo -from aleph_client.account import _load_account from aleph_client.types import AccountFromPrivateKey -from aleph_client.utils import create_archive -from . import synchronous -from .asynchronous import ( - get_fallback_session, - StorageEnum, +from aleph_client.account import _load_account +from aleph_client.conf import settings +from .commands import ( + files, + message, + program, + help_strings, + aggregate ) -from .conf import settings - -logger = logging.getLogger(__name__) -app = typer.Typer() - -class KindEnum(str, Enum): - json = "json" -def _input_multiline() -> str: - """Prompt the user for a multiline input.""" - echo("Enter/Paste your content. Ctrl-D or Ctrl-Z ( windows ) to save it.") - contents = "" - while True: - try: - line = input() - except EOFError: - break - contents += line + "\n" - return contents - - -def _setup_logging(debug: bool = False): - level = logging.DEBUG if debug else logging.WARNING - logging.basicConfig(level=level) +app = typer.Typer() +app.add_typer(files.app, name="file", help="File uploading and pinning on IPFS and Aleph.im") +app.add_typer(message.app, name="message", help="Post, amend, watch and forget messages on Aleph.im") +app.add_typer(program.app, name="program", help="Upload and update programs on Aleph's VM") +app.add_typer(aggregate.app, name="aggregate", help="Manage aggregate messages on Aleph.im") @app.command() def whoami( - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), ): """ Display your public address. """ account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - echo(account.get_public_key()) - - -@app.command() -def post( - path: Optional[Path] = None, - type: str = "test", - ref: Optional[str] = None, - channel: str = settings.DEFAULT_CHANNEL, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - debug: bool = False, -): - """Post a message on Aleph.im.""" - - _setup_logging(debug) - - account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - storage_engine: str - content: Dict - - if path: - if not path.is_file(): - echo(f"Error: File not found: '{path}'") - raise typer.Exit(code=1) - - file_size = os.path.getsize(path) - storage_engine = ( - StorageEnum.ipfs if file_size > 4 * 1024 * 1024 else StorageEnum.storage - ) - - with open(path, "r") as fd: - content = json.load(fd) - - else: - content_raw = _input_multiline() - storage_engine = ( - StorageEnum.ipfs - if len(content_raw) > 4 * 1024 * 1024 - else StorageEnum.storage - ) - try: - content = json.loads(content_raw) - except json.decoder.JSONDecodeError: - echo("Not valid JSON") - raise typer.Exit(code=2) - - try: - result: PostMessage = synchronous.create_post( - account=account, - post_content=content, - post_type=type, - ref=ref, - channel=channel, - inline=True, - storage_engine=storage_engine, - ) - echo(result.json(indent=4)) - finally: - # Prevent aiohttp unclosed connector warning - asyncio.run(get_fallback_session().close()) - - -@app.command() -def upload( - path: Path, - channel: str = settings.DEFAULT_CHANNEL, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - ref: Optional[str] = None, - debug: bool = False, -): - """Upload and store a file on Aleph.im.""" - - _setup_logging(debug) - - account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - - try: - if not path.is_file(): - echo(f"Error: File not found: '{path}'") - raise typer.Exit(code=1) - - with open(path, "rb") as fd: - logger.debug("Reading file") - # TODO: Read in lazy mode instead of copying everything in memory - file_content = fd.read() - storage_engine = ( - StorageEnum.ipfs - if len(file_content) > 4 * 1024 * 1024 - else StorageEnum.storage - ) - logger.debug("Uploading file") - result: StoreMessage = synchronous.create_store( - account=account, - file_content=file_content, - storage_engine=storage_engine, - channel=channel, - guess_mime_type=True, - ref=ref, - ) - logger.debug("Upload finished") - echo(f"{result.json(indent=4)}") - finally: - # Prevent aiohttp unclosed connector warning - asyncio.run(get_fallback_session().close()) - - -@app.command() -def pin( - hash: str, - channel: str = settings.DEFAULT_CHANNEL, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - ref: Optional[str] = None, - debug: bool = False, -): - """Persist a file from IPFS on Aleph.im.""" - - _setup_logging(debug) - - account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - - try: - result: StoreMessage = synchronous.create_store( - account=account, - file_hash=hash, - storage_engine=StorageEnum.ipfs, - channel=channel, - ref=ref, - ) - logger.debug("Upload finished") - echo(f"{result.json(indent=4)}") - finally: - # Prevent aiohttp unclosed connector warning - asyncio.run(get_fallback_session().close()) - - -def yes_no_input(text: str, default: Optional[bool] = None): - while True: - if default is True: - response = input(f"{text} [Y/n] ") - elif default is False: - response = input(f"{text} [y/N] ") - else: - response = input(f"{text} ") - - if response.lower() in ("y", "yes"): - return True - elif response.lower() in ("n", "no"): - return False - elif response == "" and default is not None: - return default - else: - if default is None: - echo("Please enter 'y', 'yes', 'n' or 'no'") - else: - echo("Please enter 'y', 'yes', 'n', 'no' or nothing") - continue - - -def _prompt_for_volumes(): - while yes_no_input("Add volume ?", default=False): - comment = input("Description: ") or None - mount = input("Mount: ") - persistent = yes_no_input("Persist on VM host ?", default=False) - if persistent: - name = input("Volume name: ") - size_mib = int(input("Size in MiB: ")) - yield { - "comment": comment, - "mount": mount, - "name": name, - "persistence": "host", - "size_mib": size_mib, - } - else: - ref = input("Ref: ") - use_latest = yes_no_input("Use latest version ?", default=True) - yield { - "comment": comment, - "mount": mount, - "ref": ref, - "use_latest": use_latest, - } - - -@app.command() -def program( - path: Path, - entrypoint: str, - channel: str = settings.DEFAULT_CHANNEL, - memory: int = settings.DEFAULT_VM_MEMORY, - vcpus: int = settings.DEFAULT_VM_VCPUS, - timeout_seconds: float = settings.DEFAULT_VM_TIMEOUT, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - print_messages: bool = False, - print_code_message: bool = False, - print_program_message: bool = False, - runtime: str = None, - beta: bool = False, - debug: bool = False, - persistent: bool = False, -): - """Register a program to run on Aleph.im virtual machines from a zip archive.""" - - _setup_logging(debug) - - path = path.absolute() - - try: - path_object, encoding = create_archive(path) - except BadZipFile: - echo("Invalid zip archive") - raise typer.Exit(3) - except FileNotFoundError: - echo("No such file or directory") - raise typer.Exit(4) - - account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - - runtime = ( - runtime - or input(f"Ref of runtime ? [{settings.DEFAULT_RUNTIME_ID}] ") - or settings.DEFAULT_RUNTIME_ID - ) - - volumes = [] - for volume in _prompt_for_volumes(): - volumes.append(volume) - echo("\n") - - subscriptions: Optional[List[Dict]] - if beta and yes_no_input("Subscribe to messages ?", default=False): - content_raw = _input_multiline() - try: - subscriptions = json.loads(content_raw) - except json.decoder.JSONDecodeError: - echo("Not valid JSON") - raise typer.Exit(code=2) - else: - subscriptions = None - - try: - # Upload the source code - with open(path_object, "rb") as fd: - logger.debug("Reading file") - # TODO: Read in lazy mode instead of copying everything in memory - file_content = fd.read() - storage_engine = ( - StorageEnum.ipfs - if len(file_content) > 4 * 1024 * 1024 - else StorageEnum.storage - ) - logger.debug("Uploading file") - user_code: StoreMessage = synchronous.create_store( - account=account, - file_content=file_content, - storage_engine=storage_engine, - channel=channel, - guess_mime_type=True, - ref=None, - ) - logger.debug("Upload finished") - if print_messages or print_code_message: - echo(f"{user_code.json(indent=4)}") - program_ref = user_code.item_hash - - # Register the program - result: ProgramMessage = synchronous.create_program( - account=account, - program_ref=program_ref, - entrypoint=entrypoint, - runtime=runtime, - storage_engine=StorageEnum.storage, - channel=channel, - memory=memory, - vcpus=vcpus, - timeout_seconds=timeout_seconds, - persistent=persistent, - encoding=encoding, - volumes=volumes, - subscriptions=subscriptions, - ) - logger.debug("Upload finished") - if print_messages or print_program_message: - echo(f"{result.json(indent=4)}") - - hash: str = result.item_hash - hash_base32 = b32encode(b16decode(hash.upper())).strip(b"=").lower().decode() - - echo( - f"Your program has been uploaded on Aleph .\n\n" - "Available on:\n" - f" {settings.VM_URL_PATH.format(hash=hash)}\n" - f" {settings.VM_URL_HOST.format(hash_base32=hash_base32)}\n" - "Visualise on:\n https://explorer.aleph.im/address/" - f"{result.chain}/{result.sender}/message/PROGRAM/{hash}\n" - ) - - finally: - # Prevent aiohttp unclosed connector warning - asyncio.run(get_fallback_session().close()) - - -@app.command() -def update( - hash: str, - path: Path, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - print_message: bool = True, - debug: bool = False, -): - """Update the code of an existing program""" - - _setup_logging(debug) - - account = _load_account(private_key, private_key_file) - path = path.absolute() - - try: - program_message: ProgramMessage = synchronous.get_message( - item_hash=hash, message_type=ProgramMessage - ) - code_ref = program_message.content.code.ref - code_message: StoreMessage = synchronous.get_message( - item_hash=code_ref, message_type=StoreMessage - ) - - try: - path, encoding = create_archive(path) - except BadZipFile: - echo("Invalid zip archive") - raise typer.Exit(3) - except FileNotFoundError: - echo("No such file or directory") - raise typer.Exit(4) - - if encoding != program_message.content.code.encoding: - logger.error( - f"Code must be encoded with the same encoding as the previous version " - f"('{encoding}' vs '{program_message.content.code.encoding}'" - ) - raise typer.Exit(1) - - # Upload the source code - with open(path, "rb") as fd: - logger.debug("Reading file") - # TODO: Read in lazy mode instead of copying everything in memory - file_content = fd.read() - logger.debug("Uploading file") - result = synchronous.create_store( - account=account, - file_content=file_content, - storage_engine=code_message.content.item_type, - channel=code_message.channel, - guess_mime_type=True, - ref=code_message.item_hash, - ) - logger.debug("Upload finished") - if print_message: - echo(f"{result.json(indent=4)}") - finally: - # Prevent aiohttp unclosed connector warning - asyncio.run(get_fallback_session().close()) - - -@app.command() -def unpersist( - hash: str, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - debug: bool = False, -): - """Stop a persistent virtual machine by making it non-persistent""" - - _setup_logging(debug) - - account = _load_account(private_key, private_key_file) - - existing: MessagesResponse = synchronous.get_messages(hashes=[hash]) - message: ProgramMessage = existing.messages[0] - content: ProgramContent = message.content.copy() - - content.on.persistent = False - content.replaces = message.item_hash - - result = synchronous.submit( - account=account, - content=content.dict(exclude_none=True), - message_type=message.type, - channel=message.channel, - ) - echo(f"{result.json(indent=4)}") - - -@app.command() -def amend( - hash: str, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - debug: bool = False, -): - """Amend an existing Aleph message.""" - - _setup_logging(debug) - - account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - - existing_message: AlephMessage = synchronous.get_message(item_hash=hash) - - editor: str = os.getenv("EDITOR", default="nano") - with tempfile.NamedTemporaryFile(suffix="json") as fd: - # Fill in message template - fd.write(existing_message.content.json(indent=4).encode()) - fd.seek(0) - - # Launch editor - subprocess.run([editor, fd.name], check=True) - - # Read new message - fd.seek(0) - new_content_json = fd.read() - - content_type = type(existing_message).__annotations__["content"] - new_content_dict = json.loads(new_content_json) - new_content = content_type(**new_content_dict) - new_content.ref = existing_message.item_hash - echo(new_content) - result = synchronous.submit( - account=account, - content=new_content.dict(), - message_type=existing_message.type, - channel=existing_message.channel, - ) - echo(f"{result.json(indent=4)}") - - -def forget_messages( - account: AccountFromPrivateKey, - hashes: List[str], - reason: Optional[str], - channel: str, -): - try: - result: ForgetMessage = synchronous.forget( - account=account, - hashes=hashes, - reason=reason, - channel=channel, - ) - echo(f"{result.json(indent=4)}") - finally: - # Prevent aiohttp unclosed connector warning - asyncio.run(get_fallback_session().close()) - - -@app.command() -def forget( - hashes: str, - reason: Optional[str] = None, - channel: str = settings.DEFAULT_CHANNEL, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - debug: bool = False, -): - """Forget an existing Aleph message.""" - - _setup_logging(debug) - - account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - - hash_list: List[str] = hashes.split(",") - forget_messages(account, hash_list, reason, channel) - - -@app.command() -def forget_aggregate( - key: str, - reason: Optional[str] = None, - channel: str = settings.DEFAULT_CHANNEL, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - debug: bool = False, -): - """Forget all the messages composing an aggregate.""" - - _setup_logging(debug) - - account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - - message_response = synchronous.get_messages( - addresses=[account.get_address()], - message_type=MessageType.aggregate.value, - content_keys=[key], - ) - hash_list = [message["item_hash"] for message in message_response["messages"]] - forget_messages(account, hash_list, reason, channel) - - -@app.command() -def watch( - ref: str, - indent: Optional[int] = None, - debug: bool = False, -): - """Watch a hash for amends and print amend hashes""" - - _setup_logging(debug) - - original: AlephMessage = synchronous.get_message(item_hash=ref) - - for message in synchronous.watch_messages( - refs=[ref], addresses=[original.content.address] - ): - echo(f"{message.json(indent=indent)}") - + typer.echo(account.get_public_key()) if __name__ == "__main__": app() diff --git a/src/aleph_client/commands/aggregate.py b/src/aleph_client/commands/aggregate.py new file mode 100644 index 00000000..ce9f3cc4 --- /dev/null +++ b/src/aleph_client/commands/aggregate.py @@ -0,0 +1,40 @@ +import typer +from typing import Optional +from aleph_client.types import AccountFromPrivateKey +from aleph_client.account import _load_account +from aleph_client.conf import settings +from pathlib import Path +from aleph_client import synchronous +from aleph_client.commands import help_strings + +from aleph_client.commands.message import forget_messages + +from aleph_client.commands.utils import setup_logging + +from aleph_message.models import MessageType + +app = typer.Typer() + +@app.command() +def forget( + key: str = typer.Argument(..., help="Aggregate item hash to be removed."), + reason: Optional[str] = typer.Option(None, help="A description of why the messages are being forgotten"), + channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help = help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help = help_strings.PRIVATE_KEY_FILE), + debug: bool = False, +): + """Forget all the messages composing an aggregate.""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + + message_response = synchronous.get_messages( + addresses=[account.get_address()], + message_type=MessageType.aggregate.value, + content_keys=[key], + ) + hash_list = [message["item_hash"] for message in message_response["messages"]] + forget_messages(account, hash_list, reason, channel) + diff --git a/src/aleph_client/commands/files.py b/src/aleph_client/commands/files.py new file mode 100644 index 00000000..85864d0b --- /dev/null +++ b/src/aleph_client/commands/files.py @@ -0,0 +1,100 @@ +import typer +import logging +from typing import Optional +from aleph_client.types import AccountFromPrivateKey +from aleph_client.account import _load_account +from aleph_client.conf import settings +from pathlib import Path +import asyncio +from aleph_client import synchronous + +from aleph_client.commands import help_strings + +from aleph_client.asynchronous import ( + get_fallback_session, + StorageEnum, +) + +from aleph_client.commands.utils import setup_logging + +from aleph_message.models import StoreMessage + + +logger = logging.getLogger(__name__) +app = typer.Typer() + + +@app.command() +def pin( + hash: str = typer.Argument(..., help="IPFS hash to pin on Aleph.im"), + channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + ref: Optional[str] = typer.Option(None, help=help_strings.REF), + debug: bool = False, +): + """Persist a file from IPFS on Aleph.im.""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + + try: + result: StoreMessage = synchronous.create_store( + account=account, + file_hash=hash, + storage_engine=StorageEnum.ipfs, + channel=channel, + ref=ref, + ) + logger.debug("Upload finished") + typer.echo(f"{result.json(indent=4)}") + finally: + # Prevent aiohttp unclosed connector warning + asyncio.run(get_fallback_session().close()) + + + +@app.command() +def upload( + path: Path = typer.Argument(..., help="Path of the file to upload"), + channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + ref: Optional[str] = typer.Option(None, help=help_strings.REF), + debug: bool = False, +): + """Upload and store a file on Aleph.im.""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + + try: + if not path.is_file(): + typer.echo(f"Error: File not found: '{path}'") + raise typer.Exit(code=1) + + with open(path, "rb") as fd: + logger.debug("Reading file") + # TODO: Read in lazy mode instead of copying everything in memory + file_content = fd.read() + storage_engine = ( + StorageEnum.ipfs + if len(file_content) > 4 * 1024 * 1024 + else StorageEnum.storage + ) + logger.debug("Uploading file") + result: StoreMessage = synchronous.create_store( + account=account, + file_content=file_content, + storage_engine=storage_engine, + channel=channel, + guess_mime_type=True, + ref=ref, + ) + logger.debug("Upload finished") + typer.echo(f"{result.json(indent=4)}") + finally: + # Prevent aiohttp unclosed connector warning + asyncio.run(get_fallback_session().close()) diff --git a/src/aleph_client/commands/help_strings.py b/src/aleph_client/commands/help_strings.py new file mode 100644 index 00000000..76fd2dfc --- /dev/null +++ b/src/aleph_client/commands/help_strings.py @@ -0,0 +1,5 @@ +IPFS_HASH = "IPFS Content identifier (CID)" +CHANNEL = "Aleph network channel where the message is located" +PRIVATE_KEY = "Your private key. Cannot be used with --private-key-file" +PRIVATE_KEY_FILE = "Path to your private key file" +REF = "Checkout https://aleph-im.gitbook.io/aleph-js/api-resources-reference/posts" diff --git a/src/aleph_client/commands/message.py b/src/aleph_client/commands/message.py new file mode 100644 index 00000000..2a6001c7 --- /dev/null +++ b/src/aleph_client/commands/message.py @@ -0,0 +1,192 @@ +import json +import os.path +import subprocess +from typing import Optional, Dict, List +from pathlib import Path +import tempfile +import asyncio + +import typer + +from aleph_message.models import ( + PostMessage, + ForgetMessage, + AlephMessage, +) + + +from aleph_client import synchronous +from aleph_client.commands import help_strings +from aleph_client.types import AccountFromPrivateKey +from aleph_client.account import _load_account +from aleph_client.conf import settings + +from aleph_client.asynchronous import ( + get_fallback_session, + StorageEnum, +) + +from aleph_client.commands.utils import ( + setup_logging, + input_multiline, +) + + +app = typer.Typer() + +@app.command() +def post( + path: Optional[Path] = typer.Option(None, help="Path to the content you want to post. If omitted, you can input your content directly"), + type: str = typer.Option("test", help="Text representing the message object type"), + ref: Optional[str] = typer.Option(None, help=help_strings.REF), + channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + debug: bool = False, +): + """Post a message on Aleph.im.""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + storage_engine: str + content: Dict + + if path: + if not path.is_file(): + typer.echo(f"Error: File not found: '{path}'") + raise typer.Exit(code=1) + + file_size = os.path.getsize(path) + storage_engine = ( + StorageEnum.ipfs if file_size > 4 * 1024 * 1024 else StorageEnum.storage + ) + + with open(path, "r") as fd: + content = json.load(fd) + + else: + content_raw = input_multiline() + storage_engine = ( + StorageEnum.ipfs + if len(content_raw) > 4 * 1024 * 1024 + else StorageEnum.storage + ) + try: + content = json.loads(content_raw) + except json.decoder.JSONDecodeError: + typer.echo("Not valid JSON") + raise typer.Exit(code=2) + + try: + result: PostMessage = synchronous.create_post( + account=account, + post_content=content, + post_type=type, + ref=ref, + channel=channel, + inline=True, + storage_engine=storage_engine, + ) + typer.echo(result.json(indent=4)) + finally: + # Prevent aiohttp unclosed connector warning + asyncio.run(get_fallback_session().close()) + + +@app.command() +def amend( + hash: str = typer.Argument(..., help="Hash reference of the message to amend"), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + debug: bool = False, +): + """Amend an existing Aleph message.""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + + existing_message: AlephMessage = synchronous.get_message(item_hash=hash) + + editor: str = os.getenv("EDITOR", default="nano") + with tempfile.NamedTemporaryFile(suffix="json") as fd: + # Fill in message template + fd.write(existing_message.content.json(indent=4).encode()) + fd.seek(0) + + # Launch editor + subprocess.run([editor, fd.name], check=True) + + # Read new message + fd.seek(0) + new_content_json = fd.read() + + content_type = type(existing_message).__annotations__["content"] + new_content_dict = json.loads(new_content_json) + new_content = content_type(**new_content_dict) + new_content.ref = existing_message.item_hash + typer.echo(new_content) + result = synchronous.submit( + account=account, + content=new_content.dict(), + message_type=existing_message.type, + channel=existing_message.channel, + ) + typer.echo(f"{result.json(indent=4)}") + + +def forget_messages( + account: AccountFromPrivateKey, + hashes: List[str], + reason: Optional[str], + channel: str, +): + try: + result: ForgetMessage = synchronous.forget( + account=account, + hashes=hashes, + reason=reason, + channel=channel, + ) + typer.echo(f"{result.json(indent=4)}") + finally: + # Prevent aiohttp unclosed connector warning + asyncio.run(get_fallback_session().close()) + + +@app.command() +def forget( + hashes: str= typer.Argument(..., help="Comma separated list of hash references of messages to forget"), + reason: Optional[str] = typer.Option(None, help="A description of why the messages are being forgotten."), + channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + debug: bool = False, +): + """Forget an existing Aleph message.""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + + hash_list: List[str] = hashes.split(",") + forget_messages(account, hash_list, reason, channel) + + +@app.command() +def watch( + ref: str = typer.Argument(..., help="Hash reference of the message to watch"), + indent: Optional[int] = typer.Option(None, help="Number of indents to use"), + debug: bool = False, +): + """Watch a hash for amends and print amend hashes""" + + setup_logging(debug) + + original: AlephMessage = synchronous.get_message(item_hash=ref) + + for message in synchronous.watch_messages( + refs=[ref], addresses=[original.content.address] + ): + typer.echo(f"{message.json(indent=indent)}") diff --git a/src/aleph_client/commands/program.py b/src/aleph_client/commands/program.py new file mode 100644 index 00000000..0b951236 --- /dev/null +++ b/src/aleph_client/commands/program.py @@ -0,0 +1,263 @@ +import typer +from typing import Optional, Dict, List +from aleph_client.types import AccountFromPrivateKey +from aleph_client.account import _load_account +from aleph_client.conf import settings +from pathlib import Path +import asyncio +from aleph_client import synchronous +import json +from zipfile import BadZipFile +from aleph_client.commands import help_strings + +import asyncio +import json +import logging +from base64 import b32encode, b16decode +from pathlib import Path +from typing import Optional, Dict, List +from zipfile import BadZipFile + +import typer +from aleph_message.models import ( + ProgramMessage, + StoreMessage, +) + +from aleph_client.types import AccountFromPrivateKey +from aleph_client.account import _load_account +from aleph_client.utils import create_archive + +logger = logging.getLogger(__name__) +app = typer.Typer() + + + +from aleph_client.asynchronous import ( + get_fallback_session, + StorageEnum, +) + +from aleph_client.commands.utils import ( + setup_logging, + input_multiline, + prompt_for_volumes, + yes_no_input +) + +from aleph_message.models import ( + ProgramMessage, + StoreMessage, + MessagesResponse, + ProgramContent, +) + +app = typer.Typer() +@app.command() +def upload( + path: Path = typer.Argument(..., help="Path to your source code"), + entrypoint: str = typer.Argument(..., help="Your program entrypoint"), + channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), + memory: int = typer.Option(settings.DEFAULT_VM_MEMORY, help="Maximum memory allocation on vm in MiB"), + vcpus: int = typer.Option(settings.DEFAULT_VM_VCPUS, help="Number of virtual cpus to allocate."), + timeout_seconds: float = typer.Option(settings.DEFAULT_VM_TIMEOUT, help="If vm is not called after [timeout_seconds] it will shutdown"), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + print_messages: bool = typer.Option(False), + print_code_message: bool = typer.Option(False), + print_program_message: bool = typer.Option(False), + runtime: str = typer.Option(None, help="Hash of the runtime to use for your program. Defaults to aleph debian with Python3.8 and node. You can also create your own runtime and pin it"), + beta: bool = typer.Option(False), + debug: bool = False, + persistent: bool = False, +): + """Register a program to run on Aleph.im virtual machines from a zip archive.""" + + setup_logging(debug) + + path = path.absolute() + + try: + path_object, encoding = create_archive(path) + except BadZipFile: + typer.echo("Invalid zip archive") + raise typer.Exit(3) + except FileNotFoundError: + typer.echo("No such file or directory") + raise typer.Exit(4) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + + runtime = ( + runtime + or input(f"Ref of runtime ? [{settings.DEFAULT_RUNTIME_ID}] ") + or settings.DEFAULT_RUNTIME_ID + ) + + volumes = [] + for volume in prompt_for_volumes(): + volumes.append(volume) + typer.echo("\n") + + subscriptions: Optional[List[Dict]] + if beta and yes_no_input("Subscribe to messages ?", default=False): + content_raw = input_multiline() + try: + subscriptions = json.loads(content_raw) + except json.decoder.JSONDecodeError: + typer.echo("Not valid JSON") + raise typer.Exit(code=2) + else: + subscriptions = None + + try: + # Upload the source code + with open(path_object, "rb") as fd: + logger.debug("Reading file") + # TODO: Read in lazy mode instead of copying everything in memory + file_content = fd.read() + storage_engine = ( + StorageEnum.ipfs + if len(file_content) > 4 * 1024 * 1024 + else StorageEnum.storage + ) + logger.debug("Uploading file") + user_code: StoreMessage = synchronous.create_store( + account=account, + file_content=file_content, + storage_engine=storage_engine, + channel=channel, + guess_mime_type=True, + ref=None, + ) + logger.debug("Upload finished") + if print_messages or print_code_message: + typer.echo(f"{user_code.json(indent=4)}") + program_ref = user_code.item_hash + + # Register the program + result: ProgramMessage = synchronous.create_program( + account=account, + program_ref=program_ref, + entrypoint=entrypoint, + runtime=runtime, + storage_engine=StorageEnum.storage, + channel=channel, + memory=memory, + vcpus=vcpus, + timeout_seconds=timeout_seconds, + persistent=persistent, + encoding=encoding, + volumes=volumes, + subscriptions=subscriptions, + ) + logger.debug("Upload finished") + if print_messages or print_program_message: + typer.echo(f"{result.json(indent=4)}") + + hash: str = result.item_hash + hash_base32 = b32encode(b16decode(hash.upper())).strip(b"=").lower().decode() + + typer.echo( + f"Your program has been uploaded on Aleph .\n\n" + "Available on:\n" + f" {settings.VM_URL_PATH.format(hash=hash)}\n" + f" {settings.VM_URL_HOST.format(hash_base32=hash_base32)}\n" + "Visualise on:\n https://explorer.aleph.im/address/" + f"{result.chain}/{result.sender}/message/PROGRAM/{hash}\n" + ) + + finally: + # Prevent aiohttp unclosed connector warning + asyncio.run(get_fallback_session().close()) + + +@app.command() +def update( + hash: str, + path: Path, + private_key: Optional[str] = settings.PRIVATE_KEY_STRING, + private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, + print_message: bool = True, + debug: bool = False, +): + """Update the code of an existing program""" + + setup_logging(debug) + + account = _load_account(private_key, private_key_file) + path = path.absolute() + + try: + program_message: ProgramMessage = synchronous.get_message( + item_hash=hash, message_type=ProgramMessage + ) + code_ref = program_message.content.code.ref + code_message: StoreMessage = synchronous.get_message( + item_hash=code_ref, message_type=StoreMessage + ) + + try: + path, encoding = create_archive(path) + except BadZipFile: + typer.echo("Invalid zip archive") + raise typer.Exit(3) + except FileNotFoundError: + typer.echo("No such file or directory") + raise typer.Exit(4) + + if encoding != program_message.content.code.encoding: + logger.error( + f"Code must be encoded with the same encoding as the previous version " + f"('{encoding}' vs '{program_message.content.code.encoding}'" + ) + raise typer.Exit(1) + + # Upload the source code + with open(path, "rb") as fd: + logger.debug("Reading file") + # TODO: Read in lazy mode instead of copying everything in memory + file_content = fd.read() + logger.debug("Uploading file") + result = synchronous.create_store( + account=account, + file_content=file_content, + storage_engine=code_message.content.item_type, + channel=code_message.channel, + guess_mime_type=True, + ref=code_message.item_hash, + ) + logger.debug("Upload finished") + if print_message: + typer.echo(f"{result.json(indent=4)}") + finally: + # Prevent aiohttp unclosed connector warning + asyncio.run(get_fallback_session().close()) + +@app.command() +def unpersist( + hash: str, + private_key: Optional[str] = settings.PRIVATE_KEY_STRING, + private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, + debug: bool = False, +): + """Stop a persistent virtual machine by making it non-persistent""" + + setup_logging(debug) + + account = _load_account(private_key, private_key_file) + + existing: MessagesResponse = synchronous.get_messages(hashes=[hash]) + message: ProgramMessage = existing.messages[0] + content: ProgramContent = message.content.copy() + + content.on.persistent = False + content.replaces = message.item_hash + + result = synchronous.submit( + account=account, + content=content.dict(exclude_none=True), + message_type=message.type, + channel=message.channel, + ) + typer.echo(f"{result.json(indent=4)}") \ No newline at end of file diff --git a/src/aleph_client/commands/utils.py b/src/aleph_client/commands/utils.py new file mode 100644 index 00000000..0387cc89 --- /dev/null +++ b/src/aleph_client/commands/utils.py @@ -0,0 +1,69 @@ +import logging +from typer import echo +from typing import Optional + +def input_multiline() -> str: + """Prompt the user for a multiline input.""" + echo("Enter/Paste your content. Ctrl-D or Ctrl-Z ( windows ) to save it.") + contents = "" + while True: + try: + line = input() + except EOFError: + break + contents += line + "\n" + return contents + + +def setup_logging(debug: bool = False): + level = logging.DEBUG if debug else logging.WARNING + logging.basicConfig(level=level) + + +def yes_no_input(text: str, default: Optional[bool] = None): + while True: + if default is True: + response = input(f"{text} [Y/n] ") + elif default is False: + response = input(f"{text} [y/N] ") + else: + response = input(f"{text} ") + + if response.lower() in ("y", "yes"): + return True + elif response.lower() in ("n", "no"): + return False + elif response == "" and default is not None: + return default + else: + if default is None: + echo("Please enter 'y', 'yes', 'n' or 'no'") + else: + echo("Please enter 'y', 'yes', 'n', 'no' or nothing") + continue + + +def prompt_for_volumes(): + while yes_no_input("Add volume ?", default=False): + comment = input("Description: ") or None + mount = input("Mount: ") + persistent = yes_no_input("Persist on VM host ?", default=False) + if persistent: + name = input("Volume name: ") + size_mib = int(input("Size in MiB: ")) + yield { + "comment": comment, + "mount": mount, + "name": name, + "persistence": "host", + "size_mib": size_mib, + } + else: + ref = input("Ref: ") + use_latest = yes_no_input("Use latest version ?", default=True) + yield { + "comment": comment, + "mount": mount, + "ref": ref, + "use_latest": use_latest, + } From 4ea20d13cc167fcc6aa8b2c42f6d7ce94a3b0582 Mon Sep 17 00:00:00 2001 From: AmozPay Date: Thu, 6 Oct 2022 10:23:14 +0200 Subject: [PATCH 2/9] refacto: commands were cluttered + no cli help Solution: put all commands in a sub folder and use typer subcommands Use typer.Option and typer.Argument to document each command parameter --- src/aleph_client/commands/program.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aleph_client/commands/program.py b/src/aleph_client/commands/program.py index 0b951236..4e46a966 100644 --- a/src/aleph_client/commands/program.py +++ b/src/aleph_client/commands/program.py @@ -260,4 +260,4 @@ def unpersist( message_type=message.type, channel=message.channel, ) - typer.echo(f"{result.json(indent=4)}") \ No newline at end of file + typer.echo(f"{result.json(indent=4)}") From fe4e46969b23800a0fc33616ca4e7073ccec69ab Mon Sep 17 00:00:00 2001 From: AmozPay Date: Tue, 30 Aug 2022 11:06:18 +0200 Subject: [PATCH 3/9] Feature: Needed docker to upload containers on vms Solution: Pull image using API and reproduce docker data dir --- .gitignore | 1 + Pipfile | 22 ++ docker/Dockerfile | 2 +- setup.cfg | 3 +- src/aleph_client/commands/container.py | 201 +++++++++++++++ .../commands/container/.gitignore | 2 + .../commands/container/docker_conf.py | 43 ++++ src/aleph_client/commands/container/image.py | 79 ++++++ src/aleph_client/commands/container/save.py | 46 ++++ .../commands/container/storage_drivers.py | 240 ++++++++++++++++++ .../commands/container/test_data/Dockerfile | 5 + .../container/test_data/create_tar_image.sh | 4 + .../container/test_data/custom-dockerd | 5 + src/aleph_client/commands/container/tests.py | 223 ++++++++++++++++ src/aleph_client/conf.py | 3 +- 15 files changed, 876 insertions(+), 3 deletions(-) create mode 100644 Pipfile create mode 100644 src/aleph_client/commands/container.py create mode 100644 src/aleph_client/commands/container/.gitignore create mode 100644 src/aleph_client/commands/container/docker_conf.py create mode 100644 src/aleph_client/commands/container/image.py create mode 100644 src/aleph_client/commands/container/save.py create mode 100644 src/aleph_client/commands/container/storage_drivers.py create mode 100644 src/aleph_client/commands/container/test_data/Dockerfile create mode 100644 src/aleph_client/commands/container/test_data/create_tar_image.sh create mode 100755 src/aleph_client/commands/container/test_data/custom-dockerd create mode 100644 src/aleph_client/commands/container/tests.py diff --git a/.gitignore b/.gitignore index c4734889..c4f3c084 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ __pycache__/* .pydevproject .settings .idea +.vscode tags # Package files diff --git a/Pipfile b/Pipfile new file mode 100644 index 00000000..02aa350e --- /dev/null +++ b/Pipfile @@ -0,0 +1,22 @@ +[[source]] +url = "https://pypi.python.org/simple" +verify_ssl = true +name = "pypi" + +[dev-packages] +pylint = "*" + +[requires] +python_version = "3.7" + +[packages] +eth-account = "*" +paho-mqtt = "*" +click = "*" +certifi = "*" +python-magic = "*" +python-docker = "*" + +[packages.e1839a8] +path = "." +editable = true diff --git a/docker/Dockerfile b/docker/Dockerfile index 787d74ff..a97cf7b8 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -21,7 +21,7 @@ RUN pip install --upgrade pip wheel twine # Preinstall dependencies for faster steps RUN pip install --upgrade secp256k1 coincurve aiohttp eciespy python-magic typer RUN pip install --upgrade 'aleph-message~=0.2.3' eth_account pynacl base58 -RUN pip install --upgrade pytest pytest-cov pytest-asyncio mypy types-setuptools pytest-asyncio fastapi requests +RUN pip install --upgrade pytest pytest-cov pytest-asyncio mypy types-setuptools pytest-asyncio fastapi requests python-docker WORKDIR /opt/aleph-client/ COPY . . diff --git a/setup.cfg b/setup.cfg index bf583aa0..3fcec780 100644 --- a/setup.cfg +++ b/setup.cfg @@ -30,7 +30,7 @@ package_dir = # DON'T CHANGE THE FOLLOWING LINE! IT WILL BE UPDATED BY PYSCAFFOLD! setup_requires = pyscaffold>=3.2a0,<3.3a0 # Add here dependencies of your project (semicolon/line-separated), e.g. -install_requires = +install_requires = coincurve aiohttp>=3.8.0 eciespy @@ -39,6 +39,7 @@ install_requires = aleph-message~=0.2.3 eth_account>=0.4.0 python-magic + python-docker # The usage of test_requires is discouraged, see `Dependency Management` docs # tests_require = pytest; pytest-cov # Require a specific Python version, e.g. Python 2.7 or >= 3.4 diff --git a/src/aleph_client/commands/container.py b/src/aleph_client/commands/container.py new file mode 100644 index 00000000..712fd267 --- /dev/null +++ b/src/aleph_client/commands/container.py @@ -0,0 +1,201 @@ +import typer +import os +import json +import magic +import logging +import asyncio + +from typing import Optional, Dict, List +from shutil import make_archive +from aleph_client.account import _load_account +from aleph_client.conf import settings +from aleph_message.models.program import Encoding # type: ignore + +from aleph_client import synchronous + +from base64 import b32encode, b16decode +from typing import Optional, Dict, List + +from typing import Optional, Dict, List +from .container.save import save_tar + +from aleph_client.account import _load_account + +logger = logging.getLogger(__name__) +app = typer.Typer() + + + +from aleph_client.asynchronous import ( + get_fallback_session, + StorageEnum, +) + +from aleph_client.commands.utils import ( + yes_no_input, + input_multiline, + prompt_for_volumes, + yes_no_input +) + +app = typer.Typer() + +@app.command() +def container( + image: str = typer.Argument(..., help="Path to an image archive exported with docker save."), + path: str = typer.Argument(..., metavar="SCRIPT", help="A small script to start your container with parameters"), + remote: bool = typer.Option(False, "--remote", "-r", help=" If --remote, IMAGE is a registry to pull the image from. e.g: library/alpine, library/ubuntu:latest"), + from_daemon: bool = typer.Option(False, "--from-daemon", "-d", help=" If --from-daemon, IMAGE is an image in local docker deamon storage. You need docker installed for this command"), + channel: str = settings.DEFAULT_CHANNEL, + memory: int = settings.DEFAULT_VM_MEMORY, + vcpus: int = settings.DEFAULT_VM_VCPUS, + timeout_seconds: float = settings.DEFAULT_VM_TIMEOUT, + private_key: Optional[str] = settings.PRIVATE_KEY_STRING, + private_key_file: Optional[str] = settings.PRIVATE_KEY_FILE, + print_messages: bool = False, + print_code_message: bool = False, + print_program_message: bool = False, + runtime: str = None, + beta: bool = False, +): + """ + Deploy a docker container on Aleph virtual machines. + Unless otherwise specified, you don't need docker on your machine to run this command. + """ + if remote or from_daemon: + raise NotImplementedError() + # echo(f"Downloading {image}") + # registry = Registry() + # tag = "latest" + # if ":" in image: + # l = image.split(":") + # tag = l[-1] + # image = l[0] + # print(tag) + # image_object = registry.pull_image(image, tag) + # manifest = registry.get_manifest_configuration(image, tag) + # image_archive = os.path.abspath(f"{str(uuid4())}.tar") + # image_object.write_filename(image_archive) + # image = image_archive + # print(manifest) + typer.echo("Preparing image for vm runtime") + docker_data_path = os.path.abspath("docker-data") + save_tar(image, docker_data_path, settings=settings.DOCKER_SETTINGS) + if settings.CODE_USES_SQUASHFS: + logger.debug("Creating squashfs archive...") + os.system(f"mksquashfs {docker_data_path} {docker_data_path}.squashfs -noappend") + docker_data_path = f"{docker_data_path}.squashfs" + assert os.path.isfile(docker_data_path) + encoding = Encoding.squashfs + path = os.path.abspath(path) + entrypoint = path + + # Create a zip archive from a directory + if os.path.isdir(path): + if settings.CODE_USES_SQUASHFS: + logger.debug("Creating squashfs archive...") + os.system(f"mksquashfs {path} {path}.squashfs -noappend") + path = f"{path}.squashfs" + assert os.path.isfile(path) + encoding = Encoding.squashfs + else: + logger.debug("Creating zip archive...") + make_archive(path, "zip", path) + path = path + ".zip" + encoding = Encoding.zip + elif os.path.isfile(path): + if path.endswith(".squashfs") or ( + magic and magic.from_file(path).startswith("Squashfs filesystem") + ): + encoding = Encoding.squashfs + elif _is_zip_valid(path): + encoding = Encoding.zip + else: + raise typer.Exit(3) + else: + typer.echo("No such file or directory") + raise typer.Exit(4) + + account = _load_account(private_key, private_key_file) + + runtime = ( + runtime + or input(f"Ref of runtime ? [{settings.DEFAULT_RUNTIME_ID}] ") + or settings.DEFAULT_RUNTIME_ID + ) + + volumes = [] + for volume in prompt_for_volumes(): + volumes.append(volume) + print() + + subscriptions: Optional[List[Dict]] + if beta and yes_no_input("Subscribe to messages ?", default=False): + content_raw = input_multiline() + try: + subscriptions = json.loads(content_raw) + except json.decoder.JSONDecodeError: + typer.echo("Not valid JSON") + raise typer.Exit(code=2) + else: + subscriptions = None + + try: + # Upload the source code + with open(path, "rb") as fd: + logger.debug("Reading file") + # TODO: Read in lazy mode instead of copying everything in memory + file_content = fd.read() + storage_engine = ( + StorageEnum.ipfs + if len(file_content) > 4 * 1024 * 1024 + else StorageEnum.storage + ) + logger.debug("Uploading file") + result = synchronous.create_store( + account=account, + file_content=file_content, + storage_engine=storage_engine, + channel=channel, + guess_mime_type=True, + ref=None, + ) + logger.debug("Upload finished") + if print_messages or print_code_message: + typer.echo(f"{json.dumps(result, indent=4)}") + program_ref = result["item_hash"] + + # Register the program + result = synchronous.create_program( + account=account, + program_ref=program_ref, + entrypoint=entrypoint, + runtime=runtime, + storage_engine=StorageEnum.storage, + channel=channel, + memory=memory, + vcpus=vcpus, + timeout_seconds=timeout_seconds, + encoding=encoding, + volumes=volumes, + subscriptions=subscriptions, + ) + logger.debug("Upload finished") + if print_messages or print_program_message: + typer.echo(f"{json.dumps(result, indent=4)}") + + hash: str = result["item_hash"] + hash_base32 = b32encode(b16decode(hash.upper())).strip(b"=").lower().decode() + + typer.echo( + f"Your program has been uploaded on Aleph .\n\n" + "Available on:\n" + f" {settings.VM_URL_PATH.format(hash=hash)}\n" + f" {settings.VM_URL_HOST.format(hash_base32=hash_base32)}\n" + "Visualise on:\n https://explorer.aleph.im/address/" + f"{result['chain']}/{result['sender']}/message/PROGRAM/{hash}\n" + ) + + finally: + # Prevent aiohttp unclosed connector warning + asyncio.get_event_loop().run_until_complete(get_fallback_session().close()) diff --git a/src/aleph_client/commands/container/.gitignore b/src/aleph_client/commands/container/.gitignore new file mode 100644 index 00000000..55720316 --- /dev/null +++ b/src/aleph_client/commands/container/.gitignore @@ -0,0 +1,2 @@ +tests/test-image.tar +docker-data diff --git a/src/aleph_client/commands/container/docker_conf.py b/src/aleph_client/commands/container/docker_conf.py new file mode 100644 index 00000000..954b3eab --- /dev/null +++ b/src/aleph_client/commands/container/docker_conf.py @@ -0,0 +1,43 @@ +from dataclasses import dataclass +from typing import Dict, NewType, Union +from enum import Enum +from shutil import which + +class StorageDriverEnum(Enum): + VFS = 1 + # OVERLAY2 = 2 + + +@dataclass +class VFSSettings: + optimize: bool = True # Keep only last layer and delete previous ones + use_tarsplit: bool = which("tar-split") is not None and which("tar") is not None + +DriverConf = NewType("DriverConf", VFSSettings) # Use Union to accomodate new features + +drivers_conf: Dict[StorageDriverEnum, DriverConf] = { + StorageDriverEnum.VFS: VFSSettings() +} + + +@dataclass +class StorageDriverSettings: + kind: StorageDriverEnum + conf: DriverConf + + def __init__(self, kind: StorageDriverEnum): + self.kind = kind + self.conf = drivers_conf[kind] + + +@dataclass +class DockerSettings: + storage_driver: StorageDriverSettings + populate: bool + +docker_settings = DockerSettings( + storage_driver=StorageDriverSettings( + kind=StorageDriverEnum.VFS + ), + populate=True +) \ No newline at end of file diff --git a/src/aleph_client/commands/container/image.py b/src/aleph_client/commands/container/image.py new file mode 100644 index 00000000..730f79b1 --- /dev/null +++ b/src/aleph_client/commands/container/image.py @@ -0,0 +1,79 @@ +from typing import List, Dict, Union, NewType +import os +import tarfile +from tarfile import TarFile +import json +from hashlib import sha256 + +Command = NewType('Command', Dict[str, str]) +ConfigValue = NewType('ConfigValue', Union[str, bool, None, Command]) + +class Image: + config: Dict[str, ConfigValue] + image_digest: str + repositories: Dict[str, object] + archive_path: str + + # Parent at index 0, child at len(list) - 1 + layers_ids: List[str] + chain_ids: List[str] + diff_ids: List[str] + + def to_dict(self): + return self.__dict__ + + def get_tar_filenames(tar: TarFile) -> List[str]: + files = tar.getmembers() + filenames = [] + for file in files: + filenames.append(file.get_info()["name"]) + return filenames + + def __load_metadata(self, tar: TarFile, file: str) -> Dict[str, str]: + return json.load(tar.extractfile(file)) + + def __init__(self, path: str): + if not os.path.exists(path): + raise ValueError("File does not exist") + if not tarfile.is_tarfile(path): + raise ValueError("Invalid tar archive") + self.archive_path = path + with tarfile.open(self.archive_path, "r") as tar: + manifest = self.__load_metadata(tar, "manifest.json") + self.repositories = self.__load_metadata(tar, "repositories") + self.image_digest = manifest[0]["Config"].split(".")[0] + self.config = self.__load_metadata(tar, f"{self.image_digest}.json") + self.layers_ids = list(map( + lambda name: name.split('/')[0], + manifest[0]["Layers"] + )) # Only keep the Layer id, not the file path + self.diff_ids = self.config["rootfs"]["diff_ids"] + self.chain_ids = self.__compute_chain_ids() + + def __compute_chain_ids(self) -> List[str]: + chain_ids = [] + # diff_ids are stored sequentially, from parent to child. + # If the file has been tempered, this method cannot work. + + # I used recursion because it was simpler to execute, not because it's better + # cause it's not. TODO: use iteration + def recursive_compute_chain_id(index: int) -> str: + # ChainID(A) = DiffID(A) + # ChainID(A|B) = Digest(ChainID(A) + " " + DiffID(B)) + # ChainID(A|B|C) = Digest(ChainID(A|B) + " " + DiffID(C)) + # https://github.com/opencontainers/image-spec/blob/main/config.md + if index == 0: + diff_id = self.diff_ids[index] + chain_ids.append(diff_id) + return diff_id + chain_id = "sha256:" + sha256( + recursive_compute_chain_id(index - 1).encode() + + " ".encode() + + self.diff_ids[index].encode() + ).hexdigest() + chain_ids.append(chain_id) + return chain_id + + recursive_compute_chain_id(len(self.layers_ids) - 1) + + return chain_ids diff --git a/src/aleph_client/commands/container/save.py b/src/aleph_client/commands/container/save.py new file mode 100644 index 00000000..08d199ca --- /dev/null +++ b/src/aleph_client/commands/container/save.py @@ -0,0 +1,46 @@ +import sys +from .image import Image +from .storage_drivers import create_storage_driver +import os +from shutil import rmtree +from .docker_conf import docker_settings, DockerSettings + +dirs = { + "vfs": 0o710, + "image": 0o700, + "plugins": 0o700, + "swarm": 0o700, + "runtimes": 0o700, + "network": 0o750, + "containers": 0o710, + "trust": 0o700, + "volumes": 0o701, + "buildkit": 0o711, + "containers": 0o710, + "tmp": 0o700, + "containerd": 0o711, +} + +def populate_dir(output_path: str): + print("populating") + path = os.path.abspath(output_path) + if os.path.exists(output_path) and os.path.isdir(output_path): + try: + rmtree(output_path) + except: + raise "" #TODO: handle error + os.makedirs(output_path, 0o710) + for d, mode in dirs.items(): + os.makedirs(os.path.join(path, d), mode) + +def save_tar(archive_path: str, output_path: str, settings: DockerSettings): + archive_path = os.path.abspath(archive_path) + output_path = os.path.abspath(output_path) + image = Image(archive_path) + if settings.populate: + populate_dir(output_path) + driver = create_storage_driver(image, output_path, settings) + driver.create_file_architecture() + +if __name__ == "__main__": + save_tar(sys.argv[1], sys.argv[2], docker_settings) \ No newline at end of file diff --git a/src/aleph_client/commands/container/storage_drivers.py b/src/aleph_client/commands/container/storage_drivers.py new file mode 100644 index 00000000..106bfa6d --- /dev/null +++ b/src/aleph_client/commands/container/storage_drivers.py @@ -0,0 +1,240 @@ +import tarfile +from typing import Dict +from .image import Image +import os +import json +from uuid import uuid4 +import tarfile +import subprocess +from shutil import rmtree +import tempfile +import gzip +from .docker_conf import DockerSettings, StorageDriverEnum + +class IStorageDriver: + def create_file_architecture(self): + """ + Reproduce the /var/lib/docker needed files in output_dir based on an image object. + """ + pass + + +class AStorageDriver(IStorageDriver): + + image: Image + output_dir: str + layer_ids_dict: Dict[str, str] + driver_dir: str + + def __init__(self, image: Image, output_dir: str, driver_dir: str): + self.image = image + self.output_dir = output_dir + self.layer_ids_dict = {} + self.driver_dir = driver_dir + + def create_file_architecture(self): + path = os.path.join(self.output_dir, "image", self.driver_dir) + os.makedirs(path, 0o700) + self.create_distribution(path) + self.create_repositories_json(path) + self.create_imagedb(path) + self.create_layerdb(self.output_dir) + return + + def create_repositories_json(self, output_dir: str): + """ + Reproduce /var/lib/docker/image/{storage_driver}/repositories.json + in output_dir based on an image object. + """ + raise NotImplementedError(f"You must implement this method") + + def create_imagedb(self, output_dir: str): + """ + Reproduce /var/lib/docker/image/{storage_driver}/imagedb + in output_dir based on an image object. + """ + raise NotImplementedError(f"You must implement this method") + + def create_layerdb(self, output_dir: str): + """ + Reproduce /var/lib/docker/image/{storage_driver}/layerdb + in output_dir based on an image object after extracting the layers + to {output_dir}/{storage_driver}. + """ + raise NotImplementedError(f"You must implement this method") + + def create_distribution(self, output_dir: str): + """ + Reproduce /var/lib/docker/image/{storage_driver}/disctibution + in output_dir based on an image object. + """ + raise NotImplementedError(f"You must implement this method") + + + +# Since aleph vms can be running with an unknown host configuration, +# storage drivers can be different from a machine to an other. +# Although not the most performant one, VFS is the most compatible +# storage driver, hence the use of it. +# Future use of an other storage driver such as Overlay2 might become +# available as compatibility checks are done on vms +class Vfs(AStorageDriver): + + def __init__(self, image: Image, output_dir: str, settings: DockerSettings): + super().__init__(image, output_dir, "vfs") + self.optimize = settings.storage_driver.conf.optimize + self.use_tarsplit = settings.storage_driver.conf.use_tarsplit + + def create_distribution(self, output_dir: str): + os.makedirs(os.path.join(output_dir, "distribution"), 0o700) + + def create_repositories_json(self, output_dir: str): + repositories = {} + for name, tags in self.image.repositories.items(): + repositories[name] = {} + for tag in tags.keys(): + repositories[name][f"{name}:{tag}"] = f"sha256:{self.image.image_digest}" + repositories = {"Repositories": repositories} + path = os.path.join(output_dir, "repositories.json") + with open(path, "w") as f: + f.write(json.dumps(repositories, separators=(',', ':'))) + os.chmod(path, 0o0600) + + def create_imagedb(self, output_dir: str): + os.makedirs(os.path.join(output_dir, "imagedb"), 0o700) + os.makedirs(os.path.join(output_dir, "imagedb", "content"), 0o700) + os.makedirs(os.path.join(output_dir, "imagedb", "metadata"), 0o700) + os.makedirs(os.path.join(output_dir, "imagedb", "content", "sha256"), 0o700) + os.makedirs(os.path.join(output_dir, "imagedb", "metadata", "sha256"), 0o700) + # os.makedirs(os.path.join(metadata, self.image.image_digest)) + content = os.path.join(output_dir, "imagedb", "content", "sha256") + path = os.path.join(content, self.image.image_digest) + with open(path, "w") as f: + f.write(json.dumps(self.image.config, separators=(',', ':'))) # This file must be dumped compactly in order to keep the correct sha256 digest + os.chmod(path, 0o0600) + # with open(os.path.join(metadata, self.image.image_digest, "parent"), "w") as f: + # f.write(self.image.config['config']['Image']) + return + + def create_layerdb(self, output_dir: str): + assert ( + len(self.image.chain_ids) == len(self.image.diff_ids) + and len(self.image.diff_ids) == len(self.image.layers_ids) + ) + layers_dir = os.path.join(output_dir, "vfs", "dir") + layerdb_path = os.path.join(output_dir, "image", "vfs", "layerdb") + os.makedirs(layerdb_path, 0o700) + os.makedirs(os.path.join(layerdb_path, "mounts"), 0o700) + os.makedirs(os.path.join(layerdb_path, "tmp"), 0o700) + layerdb_path = os.path.join(layerdb_path, "sha256") + os.makedirs(layerdb_path, 0o700) + + def save_layer_metadata(path: str, diff: str, cacheid: str, size: int, previous_chain_id: str or None): + dest = os.path.join(path, "diff") + with open(dest, "w") as fd: + fd.write(diff) + os.chmod(dest, 0o600) + dest = os.path.join(path, "cache-id") + with open(dest, "w") as fd: + fd.write(cacheid) + os.chmod(dest, 0o600) + dest = os.path.join(path, "size") + with open(dest, "w") as fd: + fd.write(str(size)) + os.chmod(dest, 0o600) + dest = os.path.join(path, "parent") + if previous_chain_id is not None: + with open(dest, "w") as fd: + fd.write(previous_chain_id) + os.chmod(dest, 0o600) + + + def copy_layer(src: str, dest: str) -> None: + for folder in os.listdir(src): + subprocess.check_output(["cp", "-r", os.path.join(src, folder), dest]) + + def compute_layer_size(tar_data_json_path: str) -> int: + size = 0 + with gzip.open(tar_data_json_path, "r") as archive: + data = json.loads( + "[" + + archive.read().decode().replace("}\n{", "},\n{") + + "]" + ) # fixes poor formatting from tar-split + for elem in data: + if "size" in elem.keys(): + size =+ elem["size"] + return size + + def extract_layer(path: str, archive_path: str, layerdb_subdir: str) -> int: + cwd = os.getcwd() + tmp_dir = tempfile.mkdtemp() + os.chdir(tmp_dir) + tar_src = os.path.join(tmp_dir, "layer.tar") + tar_dest = os.path.join(layer_id, "layer.tar") + with tarfile.open(archive_path, "r") as tar: + tar.extract(tar_dest) + os.rename(tar_dest, tar_src) + os.rmdir(layer_id) + os.chdir(path) + + # tar-split is used by docker to keep some archive metadata + # in order to compress the layer back with the exact same digest + # Mandatory if one plans to export a docker image to a tar file + # https://github.com/vbatts/tar-split + if self.use_tarsplit: + tar_data_json = os.path.join(layerdb_subdir, "tar-split.json.gz") + os.system(f"tar-split disasm --output {tar_data_json} {tar_src} | tar -C . -x") + size = compute_layer_size(tar_data_json) # Differs from expected. Only messes with docker image size listing + os.remove(tar_src) + + # Also works, but won't be able to export images + else: + with tarfile.open(tar_src, "r") as tar: + os.remove(tar_src) + tar.extractall() + size=0 + os.rmdir(tmp_dir) + os.chdir(cwd) + return size + + previous_cache_id = None + for i in range(0, len(self.image.chain_ids)): + chain_id = self.image.chain_ids[i] + layerdb_subdir = os.path.join(layerdb_path, chain_id.replace("sha256:", "")) + os.makedirs(layerdb_subdir, 0o700) + cache_id = (str(uuid4()) + str(uuid4())).replace("-", "") + + layer_id = self.image.layers_ids[i] + current_layer_path = os.path.join(layers_dir, cache_id) + os.makedirs(current_layer_path, 0o700) + + + # Merge layers + # The last layer contains changes from all the previous ones + if previous_cache_id: + previous_layer_path = os.path.join(layers_dir, previous_cache_id) + copy_layer(previous_layer_path, current_layer_path) + if (self.optimize): + rmtree(previous_layer_path) + previous_cache_id = cache_id + size = extract_layer(current_layer_path, self.image.archive_path, layerdb_subdir) + save_layer_metadata( + path=layerdb_subdir, + diff=self.image.diff_ids[i], + cacheid=cache_id, + size=size, + previous_chain_id=self.image.chain_ids[i - 1] + if i > 0 + else None + ) + + +def create_storage_driver( + image: Image, + output_dir: str, + settings: DockerSettings +) -> IStorageDriver: + if settings.storage_driver.kind == StorageDriverEnum.VFS: + return Vfs(image, output_dir, settings) + raise NotImplementedError("Only vfs supported now") \ No newline at end of file diff --git a/src/aleph_client/commands/container/test_data/Dockerfile b/src/aleph_client/commands/container/test_data/Dockerfile new file mode 100644 index 00000000..3ffe8384 --- /dev/null +++ b/src/aleph_client/commands/container/test_data/Dockerfile @@ -0,0 +1,5 @@ +FROM alpine:latest + +WORKDIR /app + +CMD ["/bin/sh"] \ No newline at end of file diff --git a/src/aleph_client/commands/container/test_data/create_tar_image.sh b/src/aleph_client/commands/container/test_data/create_tar_image.sh new file mode 100644 index 00000000..2f4175c2 --- /dev/null +++ b/src/aleph_client/commands/container/test_data/create_tar_image.sh @@ -0,0 +1,4 @@ +#!/bin/sh + +docker build -t test-image . +docker save test-image > test-image.tar || rm test-image.tar diff --git a/src/aleph_client/commands/container/test_data/custom-dockerd b/src/aleph_client/commands/container/test_data/custom-dockerd new file mode 100755 index 00000000..7a672d9c --- /dev/null +++ b/src/aleph_client/commands/container/test_data/custom-dockerd @@ -0,0 +1,5 @@ +#!/bin/sh +systemctl stop docker.service +dockerd --data-root $(pwd)/docker-data +systemctl restart docker.socket +systemctl start docker.service diff --git a/src/aleph_client/commands/container/tests.py b/src/aleph_client/commands/container/tests.py new file mode 100644 index 00000000..3cbe62e6 --- /dev/null +++ b/src/aleph_client/commands/container/tests.py @@ -0,0 +1,223 @@ +import unittest +import os +import time +from typing import List +import filecmp +import subprocess +from shutil import rmtree +from save import save_tar +from conf import settings + +TEST_DIR = os.path.abspath("test_data") +DOCKER_DATA = os.path.join(TEST_DIR, "docker") +IMAGE_NAME = "test-image" +TEST_DOCKER_DATA = os.path.join(TEST_DIR, "docker.emulate") +IMAGE_ARCHIVE = os.path.join(TEST_DIR, f"{IMAGE_NAME}.tar") + +# TODO: setup for following test cases: +# - VFS optimization is turned on +# - tar-split is not used + +def compare_folders_content(folder1: str, folder2: str): + dcmp = filecmp.dircmp(folder1, folder2) + def recursive_cmp(dcmp): + diff = dcmp.left_only + dcmp.right_only + dcmp.diff_files + for sub_dcmp in dcmp.subdirs.values(): + diff += recursive_cmp(sub_dcmp) + + return diff + + return recursive_cmp(dcmp) + +docker_daemon: subprocess.Popen = None + +class TestLoadImage(unittest.TestCase): + + @classmethod + def setUpClass(cls) -> None: + def cleanup_docker(): + os.system(f"rm -rf {DOCKER_DATA}") + os.system("systemctl stop docker.service") + cls.docker_daemon = subprocess.Popen(["dockerd", "--data-root", DOCKER_DATA], stderr=subprocess.DEVNULL) + time.sleep(3) + + def build_test_image() -> bool: + if os.path.exists(IMAGE_ARCHIVE): + return True + return ( + os.system(f"docker build -t {IMAGE_NAME} {TEST_DIR}") == 0 + and os.system(f"docker save {IMAGE_NAME} > {IMAGE_ARCHIVE}") == 0 + ) + + def load_image(): + os.system(f"docker load -i {IMAGE_ARCHIVE}") + + if not build_test_image(): + raise Exception("Could not properly build imaqge") + cleanup_docker() + load_image() + settings.storage_driver.conf.optimize = False + save_tar(IMAGE_ARCHIVE, TEST_DOCKER_DATA, settings) + + @classmethod + def tearDownClass(cls) -> None: + rmtree(TEST_DOCKER_DATA) + rmtree(DOCKER_DATA) + if cls.docker_daemon is not None: + print("KILLING DOCKERD") + cls.docker_daemon.kill() + time.sleep(3) + os.system("systemctl restart docker.socket") + time.sleep(3) + os.system("systemctl restart docker.service") + + def test_dir_creation(self) -> None: + self.assertTrue(os.path.isdir(f"{TEST_DOCKER_DATA}")) + + def folder_cmp(self, expected_path: str, result_path: str) -> List[bool]: + res = [] + expected_result = os.listdir(expected_path) + result = os.listdir(result_path) + self.assertEqual(len(expected_result), len(result)) + for folder in expected_result: + res.append(folder in result) + return res + + def permissions_cmp(self, expected_path, actual_path): + res = [] + expected_files = os.listdir(expected_path) + for f in expected_files: + expected_mode = os.stat(os.path.join(expected_path, f)).st_mode + actual_mode = os.stat(os.path.join(actual_path, f)).st_mode + if expected_mode != actual_mode: + print(os.path.join(expected_path, f), oct(expected_mode), oct(actual_mode)) + res.append(expected_mode == actual_mode) + return res + + def test_docker_dir_same(self) -> None: + for res in self.folder_cmp(DOCKER_DATA, TEST_DOCKER_DATA): + self.assertTrue(res) + for res in self.permissions_cmp(DOCKER_DATA, TEST_DOCKER_DATA): + self.assertTrue(res) + + def test_docker_image_dir_same(self) -> None: + for res in self.folder_cmp( + os.path.join(DOCKER_DATA, "image"), + os.path.join(TEST_DOCKER_DATA, "image") + ): + self.assertTrue(res) + for res in self.permissions_cmp( + os.path.join(DOCKER_DATA, "image"), + os.path.join(TEST_DOCKER_DATA, "image") + ): + self.assertTrue(res) + + def test_docker_image_vfs_dir_same(self) -> None: + for res in self.folder_cmp( + os.path.join(DOCKER_DATA, "image", "vfs"), + os.path.join(TEST_DOCKER_DATA, "image", "vfs") + ): + self.assertTrue(res) + for res in self.permissions_cmp( + os.path.join(DOCKER_DATA, "image", "vfs"), + os.path.join(TEST_DOCKER_DATA, "image", "vfs") + ): + self.assertTrue(res) + + def test_compare_repositories_json(self) -> None: + path = os.path.join("image", "vfs", "repositories.json") + expected_result_path = os.path.join(DOCKER_DATA, path) + result_path = os.path.join(TEST_DOCKER_DATA, path) + self.assertTrue(filecmp.cmp(expected_result_path, result_path)) + + def test_imagedb_same(self) -> None: + for res in self.folder_cmp( + os.path.join(DOCKER_DATA, "image", "vfs", "imagedb"), + os.path.join(TEST_DOCKER_DATA, "image", "vfs", "imagedb") + ): + self.assertTrue(res) + for res in self.permissions_cmp( + os.path.join(DOCKER_DATA, "image", "vfs", "imagedb"), + os.path.join(TEST_DOCKER_DATA, "image", "vfs", "imagedb") + ): + self.assertTrue(res) + + def test_imagedb_content_same(self) -> None: + path = os.path.join("image", "vfs", "imagedb", "content", "sha256") + for res in self.folder_cmp( + os.path.join(DOCKER_DATA, path), + os.path.join(TEST_DOCKER_DATA, path) + ): + self.assertTrue(res) + + for res in self.permissions_cmp( + os.path.join(DOCKER_DATA, path), + os.path.join(TEST_DOCKER_DATA, path) + ): + self.assertTrue(res) + + def test_imagedb_meta_same(self) -> None: + path = os.path.join("image", "vfs", "imagedb", "metadata", "sha256") + for res in self.folder_cmp( + os.path.join(DOCKER_DATA, path), + os.path.join(TEST_DOCKER_DATA, path) + ): + self.assertTrue(res) + for res in self.permissions_cmp( + os.path.join(DOCKER_DATA, path), + os.path.join(TEST_DOCKER_DATA, path) + ): + self.assertTrue(res) + + def test_compare_imagedb_files(self) -> None: + path = os.path.join("image", "vfs", "imagedb", "content", "sha256") + expected_result_dir = os.path.join(DOCKER_DATA, path) + result_dir = os.path.join(TEST_DOCKER_DATA, path) + for f in os.listdir(expected_result_dir): + result_file = os.path.join(result_dir, f) + expected_result_file = os.path.join(expected_result_dir, f) + self.assertTrue(filecmp.cmp(expected_result_file, result_file)) + + def test_compare_layerdb_same(self) -> None: + path = os.path.join("image", "vfs", "layerdb", "sha256") + for res in self.folder_cmp( + os.path.join(DOCKER_DATA, path), + os.path.join(TEST_DOCKER_DATA, path) + ): + self.assertTrue(res) + for res in self.permissions_cmp( + os.path.join(DOCKER_DATA, path), + os.path.join(TEST_DOCKER_DATA, path) + ): + self.assertTrue(res) + + def test_compare_layerdb_files(self) -> None: + path = os.path.join("image", "vfs", "layerdb", "sha256") + for folder in os.listdir(os.path.join(DOCKER_DATA, path)): + for f in os.listdir(os.path.join(DOCKER_DATA, path, folder)): + if f == "size": # not ready yet + continue + result_file = os.path.join(TEST_DOCKER_DATA, path, folder, f) + expected_result_file = os.path.join(DOCKER_DATA, path, folder, f) + res = filecmp.cmp(result_file, expected_result_file) + if f == "cache-id": # uuid should not be identical + self.assertFalse(res) + else: + self.assertTrue(res) + + def test_compare_layers(self) -> None: + path = os.path.join("image", "vfs", "layerdb", "sha256") + for folder in os.listdir(os.path.join(DOCKER_DATA, path)): + with open(os.path.join(DOCKER_DATA, path, folder, "cache-id"), "r") as f: + cache_id1 = f.read() + with open(os.path.join(TEST_DOCKER_DATA, path, folder, "cache-id"), "r") as f: + cache_id2 = f.read() + + res = compare_folders_content( + os.path.join(DOCKER_DATA, "vfs", "dir", cache_id1), + os.path.join(TEST_DOCKER_DATA, "vfs", "dir", cache_id2), + ) + self.assertEqual(len(res), 0) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/src/aleph_client/conf.py b/src/aleph_client/conf.py index fba55163..8c5cfaa1 100644 --- a/src/aleph_client/conf.py +++ b/src/aleph_client/conf.py @@ -1,7 +1,7 @@ from pathlib import Path from shutil import which from typing import Optional - +from .docker.docker_conf import docker_settings from pydantic import BaseSettings, Field @@ -32,6 +32,7 @@ class Settings(BaseSettings): VM_URL_PATH = "https://aleph.sh/vm/{hash}" VM_URL_HOST = "https://{hash_base32}.aleph.sh" + DOCKER_SETTINGS = docker_settings class Config: env_prefix = "ALEPH_" From 37a1893e8a66cdd55a7109cc6cbbf8a67972b1cd Mon Sep 17 00:00:00 2001 From: AmozPay Date: Thu, 13 Oct 2022 14:55:23 +0200 Subject: [PATCH 4/9] add iterative image building --- src/aleph_client/commands/container/image.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/aleph_client/commands/container/image.py b/src/aleph_client/commands/container/image.py index 730f79b1..c5c402f2 100644 --- a/src/aleph_client/commands/container/image.py +++ b/src/aleph_client/commands/container/image.py @@ -74,6 +74,23 @@ def recursive_compute_chain_id(index: int) -> str: chain_ids.append(chain_id) return chain_id + + + # TODO: test this iterative version + iterative_chain_ids = [] + def iterative_compute_chain_id(index: int) -> str: + diff_id = self.diff_ids[0] + iterative_chain_ids.append(diff_id) + i = 1 + while i < index: + chain_id = "sha256:" + sha256( + iterative_chain_ids[i - 1].encode() + + " ".encode() + + self.diff_ids[i].encode() + ).hexdigest() + iterative_chain_ids.append(chain_id) + return + recursive_compute_chain_id(len(self.layers_ids) - 1) return chain_ids From 4fe74531bf58e674f865c6cb28a93c978b7c5418 Mon Sep 17 00:00:00 2001 From: AmozPay Date: Fri, 18 Nov 2022 14:38:30 +0100 Subject: [PATCH 5/9] Fix: program.py contained double imports --- src/aleph_client/commands/program.py | 31 +++++++--------------------- 1 file changed, 7 insertions(+), 24 deletions(-) diff --git a/src/aleph_client/commands/program.py b/src/aleph_client/commands/program.py index 4e46a966..dd16fc2d 100644 --- a/src/aleph_client/commands/program.py +++ b/src/aleph_client/commands/program.py @@ -1,38 +1,26 @@ import typer from typing import Optional, Dict, List -from aleph_client.types import AccountFromPrivateKey -from aleph_client.account import _load_account -from aleph_client.conf import settings from pathlib import Path import asyncio -from aleph_client import synchronous import json from zipfile import BadZipFile -from aleph_client.commands import help_strings - -import asyncio -import json import logging from base64 import b32encode, b16decode -from pathlib import Path -from typing import Optional, Dict, List -from zipfile import BadZipFile -import typer from aleph_message.models import ( ProgramMessage, StoreMessage, + MessagesResponse, + ProgramContent, ) +from aleph_client import synchronous +from aleph_client.conf import settings +from aleph_client.commands import help_strings from aleph_client.types import AccountFromPrivateKey from aleph_client.account import _load_account from aleph_client.utils import create_archive -logger = logging.getLogger(__name__) -app = typer.Typer() - - - from aleph_client.asynchronous import ( get_fallback_session, StorageEnum, @@ -45,14 +33,9 @@ yes_no_input ) -from aleph_message.models import ( - ProgramMessage, - StoreMessage, - MessagesResponse, - ProgramContent, -) - +logger = logging.getLogger(__name__) app = typer.Typer() + @app.command() def upload( path: Path = typer.Argument(..., help="Path to your source code"), From 7e9d67bd250814554684d79e3fbc777fcd45799c Mon Sep 17 00:00:00 2001 From: AmozPay Date: Fri, 18 Nov 2022 14:40:14 +0100 Subject: [PATCH 6/9] Feature: could not mount docker volumes --- src/aleph_client/__main__.py | 5 +- .../cli_command.py} | 164 ++++++++---------- src/aleph_client/conf.py | 6 +- 3 files changed, 83 insertions(+), 92 deletions(-) rename src/aleph_client/commands/{container.py => container/cli_command.py} (53%) diff --git a/src/aleph_client/__main__.py b/src/aleph_client/__main__.py index 6711339e..1abe8432 100644 --- a/src/aleph_client/__main__.py +++ b/src/aleph_client/__main__.py @@ -10,12 +10,14 @@ from aleph_client.types import AccountFromPrivateKey from aleph_client.account import _load_account from aleph_client.conf import settings + +from .commands.container import cli_command as container from .commands import ( files, message, program, help_strings, - aggregate + aggregate, ) @@ -26,6 +28,7 @@ app.add_typer(message.app, name="message", help="Post, amend, watch and forget messages on Aleph.im") app.add_typer(program.app, name="program", help="Upload and update programs on Aleph's VM") app.add_typer(aggregate.app, name="aggregate", help="Manage aggregate messages on Aleph.im") +app.add_typer(container.app, name="container", help="Upload docker containers as programs on Aleph.im") @app.command() def whoami( diff --git a/src/aleph_client/commands/container.py b/src/aleph_client/commands/container/cli_command.py similarity index 53% rename from src/aleph_client/commands/container.py rename to src/aleph_client/commands/container/cli_command.py index 712fd267..ce40b68b 100644 --- a/src/aleph_client/commands/container.py +++ b/src/aleph_client/commands/container/cli_command.py @@ -1,31 +1,23 @@ import typer import os import json -import magic import logging import asyncio +from pathlib import Path from typing import Optional, Dict, List -from shutil import make_archive -from aleph_client.account import _load_account -from aleph_client.conf import settings -from aleph_message.models.program import Encoding # type: ignore - -from aleph_client import synchronous - from base64 import b32encode, b16decode from typing import Optional, Dict, List -from typing import Optional, Dict, List -from .container.save import save_tar +from aleph_message.models import StoreMessage +from aleph_client import synchronous +from aleph_client.account import _load_account, AccountFromPrivateKey +from aleph_client.conf import settings +from aleph_message.models.program import Encoding # type: ignore +from aleph_client.commands import help_strings from aleph_client.account import _load_account -logger = logging.getLogger(__name__) -app = typer.Typer() - - - from aleph_client.asynchronous import ( get_fallback_session, StorageEnum, @@ -38,31 +30,64 @@ yes_no_input ) +from .save import save_tar + +logger = logging.getLogger(__name__) app = typer.Typer() +def upload_file( + path: str, + account: AccountFromPrivateKey, + channel: str, + print_messages: bool = False, + print_code_message: bool = False +) -> StoreMessage: + with open(path, "rb") as fd: + logger.debug("Reading file") + # TODO: Read in lazy mode instead of copying everything in memory + file_content = fd.read() + storage_engine = ( + StorageEnum.ipfs + if len(file_content) > 4 * 1024 * 1024 + else StorageEnum.storage + ) + logger.debug("Uploading file") + result = synchronous.create_store( + account=account, + file_content=file_content, + storage_engine=storage_engine, + channel=channel, + guess_mime_type=True, + ref=None, + ) + logger.debug("Upload finished") + if print_messages or print_code_message: + typer.echo(f"{json.dumps(result, indent=4)}") + return result + @app.command() -def container( +def upload( image: str = typer.Argument(..., help="Path to an image archive exported with docker save."), path: str = typer.Argument(..., metavar="SCRIPT", help="A small script to start your container with parameters"), - remote: bool = typer.Option(False, "--remote", "-r", help=" If --remote, IMAGE is a registry to pull the image from. e.g: library/alpine, library/ubuntu:latest"), + from_remote: bool = typer.Option(False, "--from-remote", "-r", help=" If --from-remote, IMAGE is a registry to pull the image from. e.g: library/alpine, library/ubuntu:latest"), from_daemon: bool = typer.Option(False, "--from-daemon", "-d", help=" If --from-daemon, IMAGE is an image in local docker deamon storage. You need docker installed for this command"), - channel: str = settings.DEFAULT_CHANNEL, - memory: int = settings.DEFAULT_VM_MEMORY, - vcpus: int = settings.DEFAULT_VM_VCPUS, - timeout_seconds: float = settings.DEFAULT_VM_TIMEOUT, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[str] = settings.PRIVATE_KEY_FILE, - print_messages: bool = False, - print_code_message: bool = False, - print_program_message: bool = False, - runtime: str = None, + channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), + memory: int = typer.Option(settings.DEFAULT_VM_MEMORY, help="Maximum memory allocation on vm in MiB"), + vcpus: int = typer.Option(settings.DEFAULT_VM_VCPUS, help="Number of virtual cpus to allocate."), + timeout_seconds: float = typer.Option(settings.DEFAULT_VM_TIMEOUT, help="If vm is not called after [timeout_seconds] it will shutdown"), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + docker_mountpoint: Optional[Path] = typer.Option(settings.DEFAULT_DOCKER_VOLUME_MOUNTPOINT, "--docker-mountpoint", help="The path where the created docker image volume will be mounted"), + print_messages: bool = typer.Option(False), + print_code_message: bool = typer.Option(False), + print_program_message: bool = typer.Option(False), beta: bool = False, ): """ Deploy a docker container on Aleph virtual machines. Unless otherwise specified, you don't need docker on your machine to run this command. """ - if remote or from_daemon: + if from_remote or from_daemon: raise NotImplementedError() # echo(f"Downloading {image}") # registry = Registry() @@ -81,48 +106,19 @@ def container( typer.echo("Preparing image for vm runtime") docker_data_path = os.path.abspath("docker-data") save_tar(image, docker_data_path, settings=settings.DOCKER_SETTINGS) - if settings.CODE_USES_SQUASHFS: - logger.debug("Creating squashfs archive...") - os.system(f"mksquashfs {docker_data_path} {docker_data_path}.squashfs -noappend") - docker_data_path = f"{docker_data_path}.squashfs" - assert os.path.isfile(docker_data_path) - encoding = Encoding.squashfs + if not settings.CODE_USES_SQUASHFS: + typer.echo("The command mksquashfs must be installed!") + typer.Exit(2) + logger.debug("Creating squashfs archive...") + os.system(f"mksquashfs {docker_data_path} {docker_data_path}.squashfs -noappend") + docker_data_path = f"{docker_data_path}.squashfs" + assert os.path.isfile(docker_data_path) + encoding = Encoding.squashfs path = os.path.abspath(path) entrypoint = path - # Create a zip archive from a directory - if os.path.isdir(path): - if settings.CODE_USES_SQUASHFS: - logger.debug("Creating squashfs archive...") - os.system(f"mksquashfs {path} {path}.squashfs -noappend") - path = f"{path}.squashfs" - assert os.path.isfile(path) - encoding = Encoding.squashfs - else: - logger.debug("Creating zip archive...") - make_archive(path, "zip", path) - path = path + ".zip" - encoding = Encoding.zip - elif os.path.isfile(path): - if path.endswith(".squashfs") or ( - magic and magic.from_file(path).startswith("Squashfs filesystem") - ): - encoding = Encoding.squashfs - elif _is_zip_valid(path): - encoding = Encoding.zip - else: - raise typer.Exit(3) - else: - typer.echo("No such file or directory") - raise typer.Exit(4) - account = _load_account(private_key, private_key_file) - runtime = ( - runtime - or input(f"Ref of runtime ? [{settings.DEFAULT_RUNTIME_ID}] ") - or settings.DEFAULT_RUNTIME_ID - ) volumes = [] for volume in prompt_for_volumes(): @@ -141,36 +137,21 @@ def container( subscriptions = None try: - # Upload the source code - with open(path, "rb") as fd: - logger.debug("Reading file") - # TODO: Read in lazy mode instead of copying everything in memory - file_content = fd.read() - storage_engine = ( - StorageEnum.ipfs - if len(file_content) > 4 * 1024 * 1024 - else StorageEnum.storage - ) - logger.debug("Uploading file") - result = synchronous.create_store( - account=account, - file_content=file_content, - storage_engine=storage_engine, - channel=channel, - guess_mime_type=True, - ref=None, - ) - logger.debug("Upload finished") - if print_messages or print_code_message: - typer.echo(f"{json.dumps(result, indent=4)}") - program_ref = result["item_hash"] + docker_upload_result: StoreMessage = upload_file(docker_data_path, account, channel, print_messages, print_code_message) + volumes.append({ + "comment": "Docker container volume", + "mount": docker_mountpoint, + "ref": docker_upload_result["item_hash"], + "use_latest": True, + }) + program_result: StoreMessage = upload_file(path, account, channel, print_messages, print_code_message) # Register the program result = synchronous.create_program( account=account, - program_ref=program_ref, + program_ref=program_result["item_hash"], entrypoint=entrypoint, - runtime=runtime, + runtime=settings.DEFAULT_DOCKER_RUNTIME_ID, storage_engine=StorageEnum.storage, channel=channel, memory=memory, @@ -179,6 +160,9 @@ def container( encoding=encoding, volumes=volumes, subscriptions=subscriptions, + environment_variables={ + "DOCKER_MOUNTPOINT": docker_mountpoint + } ) logger.debug("Upload finished") if print_messages or print_program_message: diff --git a/src/aleph_client/conf.py b/src/aleph_client/conf.py index 8c5cfaa1..e6ee9f32 100644 --- a/src/aleph_client/conf.py +++ b/src/aleph_client/conf.py @@ -1,7 +1,7 @@ from pathlib import Path from shutil import which from typing import Optional -from .docker.docker_conf import docker_settings +from .commands.container.docker_conf import docker_settings from pydantic import BaseSettings, Field @@ -24,6 +24,10 @@ class Settings(BaseSettings): DEFAULT_RUNTIME_ID: str = ( "bd79839bf96e595a06da5ac0b6ba51dea6f7e2591bb913deccded04d831d29f4" ) + DEFAULT_DOCKER_RUNTIME_ID: str = ( + "bd79839bf96e595a06da5ac0b6ba51dea6f7e2591bb913deccded04d831d29f4" # TODO: Replace + ) + DEFAULT_DOCKER_VOLUME_MOUNTPOINT="/docker_aleph" DEFAULT_VM_MEMORY: int = 128 DEFAULT_VM_VCPUS: int = 1 DEFAULT_VM_TIMEOUT: float = 30.0 From 5c2256c4f5e383edba7bdc5d06b312a452bc73d5 Mon Sep 17 00:00:00 2001 From: AmozPay Date: Thu, 24 Nov 2022 11:12:49 +0100 Subject: [PATCH 7/9] Feature: ChainIds computing was not pure + Max docker layers was 1000\nSolution: Use function rather than class method depending on self + use iteration rather than recursion --- src/aleph_client/commands/container/image.py | 76 ++++++++------------ 1 file changed, 28 insertions(+), 48 deletions(-) diff --git a/src/aleph_client/commands/container/image.py b/src/aleph_client/commands/container/image.py index c5c402f2..2a478ec1 100644 --- a/src/aleph_client/commands/container/image.py +++ b/src/aleph_client/commands/container/image.py @@ -8,6 +8,30 @@ Command = NewType('Command', Dict[str, str]) ConfigValue = NewType('ConfigValue', Union[str, bool, None, Command]) + +def compute_chain_ids(diff_ids: List[str], layers_ids: List[str]) -> List[str]: + # diff_ids are stored sequentially, from parent to child. + # If the file has been tempered, this method cannot work. + # ChainID(A) = DiffID(A) + # ChainID(A|B) = Digest(ChainID(A) + " " + DiffID(B)) + # ChainID(A|B|C) = Digest(ChainID(A|B) + " " + DiffID(C)) + # https://github.com/opencontainers/image-spec/blob/main/config.md + index = 0 + chain_ids = [] + diff_id = diff_ids[index] + chain_ids.append(diff_id) + index += 1 + while index < len(layers_ids): + chain_id = "sha256:" + sha256( + chain_ids[index - 1].encode() + + " ".encode() + + diff_ids[index].encode() + ).hexdigest() + chain_ids.append(chain_id) + index += 1 + return chain_ids + + class Image: config: Dict[str, ConfigValue] image_digest: str @@ -42,55 +66,11 @@ def __init__(self, path: str): manifest = self.__load_metadata(tar, "manifest.json") self.repositories = self.__load_metadata(tar, "repositories") self.image_digest = manifest[0]["Config"].split(".")[0] - self.config = self.__load_metadata(tar, f"{self.image_digest}.json") + self.config = self.__load_metadata( + tar, f"{self.image_digest}.json") self.layers_ids = list(map( lambda name: name.split('/')[0], manifest[0]["Layers"] - )) # Only keep the Layer id, not the file path + )) # Only keep the Layer id, not the file path self.diff_ids = self.config["rootfs"]["diff_ids"] - self.chain_ids = self.__compute_chain_ids() - - def __compute_chain_ids(self) -> List[str]: - chain_ids = [] - # diff_ids are stored sequentially, from parent to child. - # If the file has been tempered, this method cannot work. - - # I used recursion because it was simpler to execute, not because it's better - # cause it's not. TODO: use iteration - def recursive_compute_chain_id(index: int) -> str: - # ChainID(A) = DiffID(A) - # ChainID(A|B) = Digest(ChainID(A) + " " + DiffID(B)) - # ChainID(A|B|C) = Digest(ChainID(A|B) + " " + DiffID(C)) - # https://github.com/opencontainers/image-spec/blob/main/config.md - if index == 0: - diff_id = self.diff_ids[index] - chain_ids.append(diff_id) - return diff_id - chain_id = "sha256:" + sha256( - recursive_compute_chain_id(index - 1).encode() - + " ".encode() - + self.diff_ids[index].encode() - ).hexdigest() - chain_ids.append(chain_id) - return chain_id - - - - # TODO: test this iterative version - iterative_chain_ids = [] - def iterative_compute_chain_id(index: int) -> str: - diff_id = self.diff_ids[0] - iterative_chain_ids.append(diff_id) - i = 1 - while i < index: - chain_id = "sha256:" + sha256( - iterative_chain_ids[i - 1].encode() - + " ".encode() - + self.diff_ids[i].encode() - ).hexdigest() - iterative_chain_ids.append(chain_id) - return - - recursive_compute_chain_id(len(self.layers_ids) - 1) - - return chain_ids + self.chain_ids = compute_chain_ids(self.diff_ids, self.layers_ids) From a7c81bfbea481e6e7061a16ab467b58f020c97f9 Mon Sep 17 00:00:00 2001 From: AmozPay Date: Thu, 1 Dec 2022 10:13:56 +0100 Subject: [PATCH 8/9] Feature: Could not save docker images locally Solution: Create a new subcommand, "aleph container create" --- .../commands/container/cli_command.py | 105 ++++++++++++------ src/aleph_client/commands/container/save.py | 15 +-- .../commands/container/storage_drivers.py | 57 +++++++--- .../commands/container/test_data/Dockerfile | 2 + .../container/test_data/custom-dockerd | 0 src/aleph_client/commands/container/tests.py | 28 +++-- src/aleph_client/commands/container/utils.py | 53 +++++++++ 7 files changed, 191 insertions(+), 69 deletions(-) mode change 100755 => 100644 src/aleph_client/commands/container/test_data/custom-dockerd create mode 100644 src/aleph_client/commands/container/utils.py diff --git a/src/aleph_client/commands/container/cli_command.py b/src/aleph_client/commands/container/cli_command.py index ce40b68b..78b03f28 100644 --- a/src/aleph_client/commands/container/cli_command.py +++ b/src/aleph_client/commands/container/cli_command.py @@ -9,7 +9,10 @@ from base64 import b32encode, b16decode from typing import Optional, Dict, List -from aleph_message.models import StoreMessage +from aleph_message.models import ( + StoreMessage, + ProgramMessage +) from aleph_client import synchronous from aleph_client.account import _load_account, AccountFromPrivateKey @@ -30,7 +33,10 @@ yes_no_input ) -from .save import save_tar + + +from aleph_client.commands.container.save import save_tar +from aleph_client.commands.container.utils import create_container_volume logger = logging.getLogger(__name__) app = typer.Typer() @@ -52,7 +58,7 @@ def upload_file( else StorageEnum.storage ) logger.debug("Uploading file") - result = synchronous.create_store( + result: StoreMessage = synchronous.create_store( account=account, file_content=file_content, storage_engine=storage_engine, @@ -65,12 +71,30 @@ def upload_file( typer.echo(f"{json.dumps(result, indent=4)}") return result +def MutuallyExclusiveBoolean(): + marker = None + def callback(ctx: typer.Context, param: typer.CallbackParam, value: str): + # Add cli option to group if it was called with a value + nonlocal marker + if value is False: + return value + if marker is None: + marker = param.name + if param.name != marker: + raise typer.BadParameter( + f"{param.name} is mutually exclusive with {marker}") + return value + return callback + +exclusivity_callback = MutuallyExclusiveBoolean() + @app.command() def upload( image: str = typer.Argument(..., help="Path to an image archive exported with docker save."), path: str = typer.Argument(..., metavar="SCRIPT", help="A small script to start your container with parameters"), - from_remote: bool = typer.Option(False, "--from-remote", "-r", help=" If --from-remote, IMAGE is a registry to pull the image from. e.g: library/alpine, library/ubuntu:latest"), - from_daemon: bool = typer.Option(False, "--from-daemon", "-d", help=" If --from-daemon, IMAGE is an image in local docker deamon storage. You need docker installed for this command"), + from_remote: bool = typer.Option(False, "--from-remote", help=" If --from-remote, IMAGE is a registry to pull the image from. e.g: library/alpine, library/ubuntu:latest", callback=exclusivity_callback), + from_daemon: bool = typer.Option(False, "--from-daemon", help=" If --from-daemon, IMAGE is an image in local docker deamon storage. You need docker installed for this command", callback=exclusivity_callback), + from_created: bool = typer.Option(False, "--from-created", help=" If --from-created, IMAGE the path to a file created with 'aleph container create'", callback=exclusivity_callback), channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), memory: int = typer.Option(settings.DEFAULT_VM_MEMORY, help="Maximum memory allocation on vm in MiB"), vcpus: int = typer.Option(settings.DEFAULT_VM_VCPUS, help="Number of virtual cpus to allocate."), @@ -78,6 +102,7 @@ def upload( private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), docker_mountpoint: Optional[Path] = typer.Option(settings.DEFAULT_DOCKER_VOLUME_MOUNTPOINT, "--docker-mountpoint", help="The path where the created docker image volume will be mounted"), + optimize: bool = typer.Option(True, help="Activate volume size optimization"), print_messages: bool = typer.Option(False), print_code_message: bool = typer.Option(False), print_program_message: bool = typer.Option(False), @@ -87,31 +112,15 @@ def upload( Deploy a docker container on Aleph virtual machines. Unless otherwise specified, you don't need docker on your machine to run this command. """ - if from_remote or from_daemon: - raise NotImplementedError() - # echo(f"Downloading {image}") - # registry = Registry() - # tag = "latest" - # if ":" in image: - # l = image.split(":") - # tag = l[-1] - # image = l[0] - # print(tag) - # image_object = registry.pull_image(image, tag) - # manifest = registry.get_manifest_configuration(image, tag) - # image_archive = os.path.abspath(f"{str(uuid4())}.tar") - # image_object.write_filename(image_archive) - # image = image_archive - # print(manifest) typer.echo("Preparing image for vm runtime") - docker_data_path = os.path.abspath("docker-data") - save_tar(image, docker_data_path, settings=settings.DOCKER_SETTINGS) - if not settings.CODE_USES_SQUASHFS: - typer.echo("The command mksquashfs must be installed!") - typer.Exit(2) - logger.debug("Creating squashfs archive...") - os.system(f"mksquashfs {docker_data_path} {docker_data_path}.squashfs -noappend") - docker_data_path = f"{docker_data_path}.squashfs" + docker_data_path=image + if not from_created: + docker_data_path = os.path.abspath("docker-data.squashfs") + try: + create_container_volume(image, docker_data_path, from_remote, from_daemon, optimize, settings) + except Exception as e: + typer.echo(e) + raise typer.Exit(1) assert os.path.isfile(docker_data_path) encoding = Encoding.squashfs path = os.path.abspath(path) @@ -140,16 +149,19 @@ def upload( docker_upload_result: StoreMessage = upload_file(docker_data_path, account, channel, print_messages, print_code_message) volumes.append({ "comment": "Docker container volume", - "mount": docker_mountpoint, - "ref": docker_upload_result["item_hash"], + "mount": str(docker_mountpoint), + "ref": str(docker_upload_result.item_hash), "use_latest": True, }) + + typer.echo(f"Docker image upload message address: {docker_upload_result.item_hash}") + program_result: StoreMessage = upload_file(path, account, channel, print_messages, print_code_message) # Register the program - result = synchronous.create_program( + result: ProgramMessage = synchronous.create_program( account=account, - program_ref=program_result["item_hash"], + program_ref=program_result.item_hash, entrypoint=entrypoint, runtime=settings.DEFAULT_DOCKER_RUNTIME_ID, storage_engine=StorageEnum.storage, @@ -161,25 +173,46 @@ def upload( volumes=volumes, subscriptions=subscriptions, environment_variables={ - "DOCKER_MOUNTPOINT": docker_mountpoint + "DOCKER_MOUNTPOINT": str(docker_mountpoint) } ) logger.debug("Upload finished") if print_messages or print_program_message: typer.echo(f"{json.dumps(result, indent=4)}") - hash: str = result["item_hash"] + hash: str = result.item_hash hash_base32 = b32encode(b16decode(hash.upper())).strip(b"=").lower().decode() + typer.echo( f"Your program has been uploaded on Aleph .\n\n" "Available on:\n" f" {settings.VM_URL_PATH.format(hash=hash)}\n" f" {settings.VM_URL_HOST.format(hash_base32=hash_base32)}\n" "Visualise on:\n https://explorer.aleph.im/address/" - f"{result['chain']}/{result['sender']}/message/PROGRAM/{hash}\n" + f"{result.chain}/{result.sender}/message/PROGRAM/{hash}\n" ) finally: # Prevent aiohttp unclosed connector warning asyncio.get_event_loop().run_until_complete(get_fallback_session().close()) + +@app.command() +def create( + image: str = typer.Argument(..., help="Path to an image archive exported with docker save."), + output: str = typer.Argument(..., help="The path where you want "), + from_remote: bool = typer.Option(False, "--from-remote", help=" If --from-remote, IMAGE is a registry to pull the image from. e.g: library/alpine, library/ubuntu:latest", callback=exclusivity_callback), + from_daemon: bool = typer.Option(False, "--from-daemon", help=" If --from-daemon, IMAGE is an image in local docker deamon storage. You need docker installed for this command", callback=exclusivity_callback), + optimize: bool = typer.Option(True, help="Activate volume size optimization"), +): + """ + Use a docker image to create an Aleph compatible image on your local machine. + You can later upload it with 'aleph container upload --from-' + """ + try: + create_container_volume(image, output, from_remote, from_daemon, optimize, settings) + typer.echo(f"Container volume created at {output}") + except Exception as e: + typer.echo(e) + raise typer.Exit(1) + return \ No newline at end of file diff --git a/src/aleph_client/commands/container/save.py b/src/aleph_client/commands/container/save.py index 08d199ca..cf27dd46 100644 --- a/src/aleph_client/commands/container/save.py +++ b/src/aleph_client/commands/container/save.py @@ -1,9 +1,9 @@ import sys -from .image import Image -from .storage_drivers import create_storage_driver +from aleph_client.commands.container.image import Image +from aleph_client.commands.container.storage_drivers import create_storage_driver import os from shutil import rmtree -from .docker_conf import docker_settings, DockerSettings +from aleph_client.commands.container.docker_conf import docker_settings, DockerSettings dirs = { "vfs": 0o710, @@ -12,15 +12,14 @@ "swarm": 0o700, "runtimes": 0o700, "network": 0o750, - "containers": 0o710, "trust": 0o700, "volumes": 0o701, "buildkit": 0o711, "containers": 0o710, "tmp": 0o700, - "containerd": 0o711, } + def populate_dir(output_path: str): print("populating") path = os.path.abspath(output_path) @@ -28,11 +27,12 @@ def populate_dir(output_path: str): try: rmtree(output_path) except: - raise "" #TODO: handle error + raise "" # TODO: handle error os.makedirs(output_path, 0o710) for d, mode in dirs.items(): os.makedirs(os.path.join(path, d), mode) + def save_tar(archive_path: str, output_path: str, settings: DockerSettings): archive_path = os.path.abspath(archive_path) output_path = os.path.abspath(output_path) @@ -42,5 +42,6 @@ def save_tar(archive_path: str, output_path: str, settings: DockerSettings): driver = create_storage_driver(image, output_path, settings) driver.create_file_architecture() + if __name__ == "__main__": - save_tar(sys.argv[1], sys.argv[2], docker_settings) \ No newline at end of file + save_tar(sys.argv[1], sys.argv[2], docker_settings) diff --git a/src/aleph_client/commands/container/storage_drivers.py b/src/aleph_client/commands/container/storage_drivers.py index 106bfa6d..c47bce0b 100644 --- a/src/aleph_client/commands/container/storage_drivers.py +++ b/src/aleph_client/commands/container/storage_drivers.py @@ -1,5 +1,5 @@ import tarfile -from typing import Dict +from typing import Dict, List from .image import Image import os import json @@ -11,6 +11,7 @@ import gzip from .docker_conf import DockerSettings, StorageDriverEnum + class IStorageDriver: def create_file_architecture(self): """ @@ -70,6 +71,12 @@ def create_distribution(self, output_dir: str): """ raise NotImplementedError(f"You must implement this method") + def optimize(self, output_dir: str): + """ + Reproduce /var/lib/docker/image/{storage_driver}/disctibution + in output_dir based on an image object. + """ + raise NotImplementedError(f"You must implement this method") # Since aleph vms can be running with an unknown host configuration, @@ -104,13 +111,16 @@ def create_imagedb(self, output_dir: str): os.makedirs(os.path.join(output_dir, "imagedb"), 0o700) os.makedirs(os.path.join(output_dir, "imagedb", "content"), 0o700) os.makedirs(os.path.join(output_dir, "imagedb", "metadata"), 0o700) - os.makedirs(os.path.join(output_dir, "imagedb", "content", "sha256"), 0o700) - os.makedirs(os.path.join(output_dir, "imagedb", "metadata", "sha256"), 0o700) + os.makedirs(os.path.join(output_dir, "imagedb", + "content", "sha256"), 0o700) + os.makedirs(os.path.join(output_dir, "imagedb", + "metadata", "sha256"), 0o700) # os.makedirs(os.path.join(metadata, self.image.image_digest)) content = os.path.join(output_dir, "imagedb", "content", "sha256") path = os.path.join(content, self.image.image_digest) with open(path, "w") as f: - f.write(json.dumps(self.image.config, separators=(',', ':'))) # This file must be dumped compactly in order to keep the correct sha256 digest + # This file must be dumped compactly in order to keep the correct sha256 digest + f.write(json.dumps(self.image.config, separators=(',', ':'))) os.chmod(path, 0o0600) # with open(os.path.join(metadata, self.image.image_digest, "parent"), "w") as f: # f.write(self.image.config['config']['Image']) @@ -148,10 +158,10 @@ def save_layer_metadata(path: str, diff: str, cacheid: str, size: int, previous_ fd.write(previous_chain_id) os.chmod(dest, 0o600) - def copy_layer(src: str, dest: str) -> None: for folder in os.listdir(src): - subprocess.check_output(["cp", "-r", os.path.join(src, folder), dest]) + subprocess.check_output( + ["cp", "-r", os.path.join(src, folder), dest]) def compute_layer_size(tar_data_json_path: str) -> int: size = 0 @@ -160,12 +170,15 @@ def compute_layer_size(tar_data_json_path: str) -> int: "[" + archive.read().decode().replace("}\n{", "},\n{") + "]" - ) # fixes poor formatting from tar-split + ) # fixes poor formatting from tar-split for elem in data: if "size" in elem.keys(): - size =+ elem["size"] + size = + elem["size"] return size + def remove_unused_layers(layers_dir: str, keep: List[str]): + return + def extract_layer(path: str, archive_path: str, layerdb_subdir: str) -> int: cwd = os.getcwd() tmp_dir = tempfile.mkdtemp() @@ -183,9 +196,12 @@ def extract_layer(path: str, archive_path: str, layerdb_subdir: str) -> int: # Mandatory if one plans to export a docker image to a tar file # https://github.com/vbatts/tar-split if self.use_tarsplit: - tar_data_json = os.path.join(layerdb_subdir, "tar-split.json.gz") - os.system(f"tar-split disasm --output {tar_data_json} {tar_src} | tar -C . -x") - size = compute_layer_size(tar_data_json) # Differs from expected. Only messes with docker image size listing + tar_data_json = os.path.join( + layerdb_subdir, "tar-split.json.gz") + os.system( + f"tar-split disasm --output {tar_data_json} {tar_src} | tar -C . -x") + # Differs from expected. Only messes with docker image size listing + size = compute_layer_size(tar_data_json) os.remove(tar_src) # Also works, but won't be able to export images @@ -193,7 +209,7 @@ def extract_layer(path: str, archive_path: str, layerdb_subdir: str) -> int: with tarfile.open(tar_src, "r") as tar: os.remove(tar_src) tar.extractall() - size=0 + size = 0 os.rmdir(tmp_dir) os.chdir(cwd) return size @@ -201,7 +217,8 @@ def extract_layer(path: str, archive_path: str, layerdb_subdir: str) -> int: previous_cache_id = None for i in range(0, len(self.image.chain_ids)): chain_id = self.image.chain_ids[i] - layerdb_subdir = os.path.join(layerdb_path, chain_id.replace("sha256:", "")) + layerdb_subdir = os.path.join( + layerdb_path, chain_id.replace("sha256:", "")) os.makedirs(layerdb_subdir, 0o700) cache_id = (str(uuid4()) + str(uuid4())).replace("-", "") @@ -209,16 +226,17 @@ def extract_layer(path: str, archive_path: str, layerdb_subdir: str) -> int: current_layer_path = os.path.join(layers_dir, cache_id) os.makedirs(current_layer_path, 0o700) - # Merge layers # The last layer contains changes from all the previous ones if previous_cache_id: - previous_layer_path = os.path.join(layers_dir, previous_cache_id) + previous_layer_path = os.path.join( + layers_dir, previous_cache_id) copy_layer(previous_layer_path, current_layer_path) if (self.optimize): rmtree(previous_layer_path) previous_cache_id = cache_id - size = extract_layer(current_layer_path, self.image.archive_path, layerdb_subdir) + size = extract_layer(current_layer_path, + self.image.archive_path, layerdb_subdir) save_layer_metadata( path=layerdb_subdir, diff=self.image.diff_ids[i], @@ -228,6 +246,11 @@ def extract_layer(path: str, archive_path: str, layerdb_subdir: str) -> int: if i > 0 else None ) + if self.optimize: + layer_to_keep = os.path.join( + layers_dir, previous_cache_id + ) + remove_unused_layers(layers_dir, layer_to_keep) def create_storage_driver( @@ -237,4 +260,4 @@ def create_storage_driver( ) -> IStorageDriver: if settings.storage_driver.kind == StorageDriverEnum.VFS: return Vfs(image, output_dir, settings) - raise NotImplementedError("Only vfs supported now") \ No newline at end of file + raise NotImplementedError("Only vfs supported now") diff --git a/src/aleph_client/commands/container/test_data/Dockerfile b/src/aleph_client/commands/container/test_data/Dockerfile index 3ffe8384..aed8f85b 100644 --- a/src/aleph_client/commands/container/test_data/Dockerfile +++ b/src/aleph_client/commands/container/test_data/Dockerfile @@ -1,5 +1,7 @@ FROM alpine:latest WORKDIR /app +RUN touch file.txt +RUN mkdir folder.d CMD ["/bin/sh"] \ No newline at end of file diff --git a/src/aleph_client/commands/container/test_data/custom-dockerd b/src/aleph_client/commands/container/test_data/custom-dockerd old mode 100755 new mode 100644 diff --git a/src/aleph_client/commands/container/tests.py b/src/aleph_client/commands/container/tests.py index 3cbe62e6..fbde327a 100644 --- a/src/aleph_client/commands/container/tests.py +++ b/src/aleph_client/commands/container/tests.py @@ -5,8 +5,8 @@ import filecmp import subprocess from shutil import rmtree -from save import save_tar -from conf import settings +from aleph_client.commands.container.save import save_tar +from aleph_client.commands.container.docker_conf import docker_settings as settings TEST_DIR = os.path.abspath("test_data") DOCKER_DATA = os.path.join(TEST_DIR, "docker") @@ -18,8 +18,10 @@ # - VFS optimization is turned on # - tar-split is not used + def compare_folders_content(folder1: str, folder2: str): dcmp = filecmp.dircmp(folder1, folder2) + def recursive_cmp(dcmp): diff = dcmp.left_only + dcmp.right_only + dcmp.diff_files for sub_dcmp in dcmp.subdirs.values(): @@ -29,8 +31,10 @@ def recursive_cmp(dcmp): return recursive_cmp(dcmp) + docker_daemon: subprocess.Popen = None + class TestLoadImage(unittest.TestCase): @classmethod @@ -38,7 +42,8 @@ def setUpClass(cls) -> None: def cleanup_docker(): os.system(f"rm -rf {DOCKER_DATA}") os.system("systemctl stop docker.service") - cls.docker_daemon = subprocess.Popen(["dockerd", "--data-root", DOCKER_DATA], stderr=subprocess.DEVNULL) + cls.docker_daemon = subprocess.Popen( + ["dockerd", "--data-root", DOCKER_DATA, "--storage-driver=vfs"], stderr=subprocess.DEVNULL) time.sleep(3) def build_test_image() -> bool: @@ -78,6 +83,9 @@ def folder_cmp(self, expected_path: str, result_path: str) -> List[bool]: res = [] expected_result = os.listdir(expected_path) result = os.listdir(result_path) + if not settings.storage_driver.conf.use_tarsplit: + expected_result = list(filter( + lambda result: result != "tar-split.json.gz", expected_result)) self.assertEqual(len(expected_result), len(result)) for folder in expected_result: res.append(folder in result) @@ -89,8 +97,6 @@ def permissions_cmp(self, expected_path, actual_path): for f in expected_files: expected_mode = os.stat(os.path.join(expected_path, f)).st_mode actual_mode = os.stat(os.path.join(actual_path, f)).st_mode - if expected_mode != actual_mode: - print(os.path.join(expected_path, f), oct(expected_mode), oct(actual_mode)) res.append(expected_mode == actual_mode) return res @@ -195,12 +201,15 @@ def test_compare_layerdb_files(self) -> None: path = os.path.join("image", "vfs", "layerdb", "sha256") for folder in os.listdir(os.path.join(DOCKER_DATA, path)): for f in os.listdir(os.path.join(DOCKER_DATA, path, folder)): - if f == "size": # not ready yet + if f == "tar-split.json.gz" and not settings.storage_driver.conf.use_tarsplit: + continue + if f == "size": # not ready yet continue result_file = os.path.join(TEST_DOCKER_DATA, path, folder, f) - expected_result_file = os.path.join(DOCKER_DATA, path, folder, f) + expected_result_file = os.path.join( + DOCKER_DATA, path, folder, f) res = filecmp.cmp(result_file, expected_result_file) - if f == "cache-id": # uuid should not be identical + if f == "cache-id": # uuid should not be identical self.assertFalse(res) else: self.assertTrue(res) @@ -219,5 +228,6 @@ def test_compare_layers(self) -> None: ) self.assertEqual(len(res), 0) + if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() diff --git a/src/aleph_client/commands/container/utils.py b/src/aleph_client/commands/container/utils.py new file mode 100644 index 00000000..4d3236c9 --- /dev/null +++ b/src/aleph_client/commands/container/utils.py @@ -0,0 +1,53 @@ + +import os +import logging +from shutil import rmtree + +from aleph_client.conf import Settings +from aleph_client.commands.container.save import save_tar + +logger = logging.getLogger(__name__) + + +def create_container_volume( + image: str, + output: str, + from_remote: bool, + from_daemon: bool, + optimize: bool, + settings: Settings +) -> str: + if from_remote: + raise NotImplementedError() + # echo(f"Downloading {image}") + # registry = Registry() + # tag = "latest" + # if ":" in image: + # l = image.split(":") + # tag = l[-1] + # image = l[0] + # print(tag) + # image_object = registry.pull_image(image, tag) + # manifest = registry.get_manifest_configuration(image, tag) + # image_archive = os.path.abspath(f"{str(uuid4())}.tar") + # image_object.write_filename(image_archive) + # image = image_archive + # print(manifest) + elif from_daemon: + output_from_daemon = f"{image}.tar" + if os.system(f"docker image inspect {image} >/dev/null 2>&1"): + raise Exception(f"Can't find image '{image}'") + if os.system(f"docker save {image} > {output_from_daemon}") != 0: + raise Exception("Error while saving docker image") + image = output_from_daemon + output = os.path.abspath(output) + tmp_output = f"{output}.tmp" + settings.DOCKER_SETTINGS.storage_driver.conf.optimize = optimize + save_tar(image, tmp_output, settings=settings.DOCKER_SETTINGS) + if not settings.CODE_USES_SQUASHFS: + raise Exception("The command mksquashfs must be installed!") + logger.debug("Creating squashfs archive...") + os.system( + f"mksquashfs {tmp_output} {output} -noappend" + ) + rmtree(tmp_output) \ No newline at end of file From 708cd945316f28bae599a93df2b2c71930299829 Mon Sep 17 00:00:00 2001 From: AmozPay Date: Thu, 1 Dec 2022 16:31:46 +0100 Subject: [PATCH 9/9] Feature: Could not split docker layers from metadata Solution: Create two different output files in the output directory --- .../commands/container/cli_command.py | 22 ++++++++++++++----- .../commands/container/docker_conf.py | 2 +- src/aleph_client/commands/container/utils.py | 8 +++---- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/aleph_client/commands/container/cli_command.py b/src/aleph_client/commands/container/cli_command.py index 78b03f28..732ac9df 100644 --- a/src/aleph_client/commands/container/cli_command.py +++ b/src/aleph_client/commands/container/cli_command.py @@ -115,7 +115,7 @@ def upload( typer.echo("Preparing image for vm runtime") docker_data_path=image if not from_created: - docker_data_path = os.path.abspath("docker-data.squashfs") + docker_data_path = os.path.abspath("docker-data") try: create_container_volume(image, docker_data_path, from_remote, from_daemon, optimize, settings) except Exception as e: @@ -146,15 +146,25 @@ def upload( subscriptions = None try: - docker_upload_result: StoreMessage = upload_file(docker_data_path, account, channel, print_messages, print_code_message) + docker_upload_layers_result: StoreMessage = upload_file(f"{docker_data_path}/layers", account, channel, print_messages, print_code_message) + docker_upload_metadata_result: StoreMessage = upload_file(f"{docker_data_path}/metadata", account, channel, print_messages, print_code_message) + typer.echo(f"Docker image layers upload message address: {docker_upload_layers_result.item_hash}") + typer.echo(f"Docker image metadata upload message address: {docker_upload_metadata_result.item_hash}") + + volumes.append({ + "comment": "Docker image layers", + "mount": f"{str(docker_mountpoint)}/layers", + "ref": str(docker_upload_layers_result.item_hash), + "use_latest": True, + }) + volumes.append({ - "comment": "Docker container volume", - "mount": str(docker_mountpoint), - "ref": str(docker_upload_result.item_hash), + "comment": "Docker image metadata", + "mount": f"{str(docker_mountpoint)}/metadata", + "ref": str(docker_upload_metadata_result.item_hash), "use_latest": True, }) - typer.echo(f"Docker image upload message address: {docker_upload_result.item_hash}") program_result: StoreMessage = upload_file(path, account, channel, print_messages, print_code_message) diff --git a/src/aleph_client/commands/container/docker_conf.py b/src/aleph_client/commands/container/docker_conf.py index 954b3eab..f434cc3c 100644 --- a/src/aleph_client/commands/container/docker_conf.py +++ b/src/aleph_client/commands/container/docker_conf.py @@ -39,5 +39,5 @@ class DockerSettings: storage_driver=StorageDriverSettings( kind=StorageDriverEnum.VFS ), - populate=True + populate=False ) \ No newline at end of file diff --git a/src/aleph_client/commands/container/utils.py b/src/aleph_client/commands/container/utils.py index 4d3236c9..c5552781 100644 --- a/src/aleph_client/commands/container/utils.py +++ b/src/aleph_client/commands/container/utils.py @@ -34,7 +34,7 @@ def create_container_volume( # image = image_archive # print(manifest) elif from_daemon: - output_from_daemon = f"{image}.tar" + output_from_daemon = f"{output}.image.tar" if os.system(f"docker image inspect {image} >/dev/null 2>&1"): raise Exception(f"Can't find image '{image}'") if os.system(f"docker save {image} > {output_from_daemon}") != 0: @@ -47,7 +47,7 @@ def create_container_volume( if not settings.CODE_USES_SQUASHFS: raise Exception("The command mksquashfs must be installed!") logger.debug("Creating squashfs archive...") - os.system( - f"mksquashfs {tmp_output} {output} -noappend" - ) + os.makedirs(output) + os.system(f"mksquashfs {tmp_output}/image/vfs {output}/metadata -noappend") + os.system(f"mksquashfs {tmp_output}/vfs {output}/layers -noappend") rmtree(tmp_output) \ No newline at end of file