diff --git a/.qubesbuilder b/.qubesbuilder index 4862f2c7..8e3e434d 100644 --- a/.qubesbuilder +++ b/.qubesbuilder @@ -9,3 +9,6 @@ vm: deb: build: - debian + archlinux: + build: + - archlinux diff --git a/Dockerfile.run_tests b/Dockerfile.run_tests new file mode 100644 index 00000000..629d07cc --- /dev/null +++ b/Dockerfile.run_tests @@ -0,0 +1,37 @@ +# Builds an image to run local tests +# Build using e.g. `podman build -t qos-test . -f Dockerfile.run_tests` +# Execute tests with `podman run --rm -it --cap-add SYS_ADMIN qos-test:latest` + +FROM registry.gitlab.com/qubesos/docker-images/qubesos-ci:latest +USER root +RUN dnf install -y \ + openssl \ + python3-rpm \ + sequoia-sqv \ + python3-xlib \ + python3-tqdm \ + python3-xcffib \ + python3-pyxdg \ + libnotify \ + zstd \ + xorg-x11-server-Xvfb \ + && dnf clean all +RUN dnf install -y --enablerepo=qubes-host-r4.3-current-testing scrypt + +COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv + +WORKDIR /workspace +COPY . /workspace +RUN chown -R gitlab-runner:gitlab-runner /workspace + +# PAM modified to allow tests to run (e.g. via podman) on systems (=OS outside the container) which require a root pwd +RUN echo "gitlab-runner ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers \ + && echo "#%PAM-1.0" > /etc/pam.d/sudo \ + && echo "auth sufficient pam_permit.so" >> /etc/pam.d/sudo \ + && echo "account sufficient pam_permit.so" >> /etc/pam.d/sudo \ + && echo "session required pam_permit.so" >> /etc/pam.d/sudo + +USER gitlab-runner +ENV USER=gitlab-runner +ENV ENABLE_SLOW_TESTS=1 +CMD xvfb-run uv run python -m unittest discover -s qubesadmin/tests -p '*.py' -v \ No newline at end of file diff --git a/archlinux/PKGBUILD.in b/archlinux/PKGBUILD.in new file mode 100644 index 00000000..0571effb --- /dev/null +++ b/archlinux/PKGBUILD.in @@ -0,0 +1,45 @@ +#!/bin/bash +pkgname=qubes-core-admin-client +pkgver=@VERSION@ +pkgrel=@REL@ +pkgdesc="This package include management tools, like qvm-*." +arch=("x86_64") +url="https://qubes-os.org/" +license=('GPL') +depends=( + 'scrypt' + 'xorg-xrandr' + 'python-setuptools' + 'python-tqdm' + 'python-xcffib' + 'python-xlib' + 'python-yaml' + 'qubes-repo-templates' + 'qubes-rpm-oxide' + ) +makedepends=( + 'make' + 'python-build' + 'python-installer' + 'python-setuptools' + 'python-wheel' +) + +_pkgnvr="${pkgname}-${pkgver}-${pkgrel}" +changelog=debian/changelog +source=("${_pkgnvr}.tar.gz") +md5sums=(SKIP) + +build() { + cd "${_pkgnvr}" + python -m build --wheel --no-isolation +} + +package() { + cd "${_pkgnvr}" + # shellcheck disable=SC2154 + python -m installer --destdir="$pkgdir" dist/*.whl + make install-misc DESTDIR="$pkgdir" +} + +# vim:set tabstop=4 shiftwidth=4 softtabstop=4 expandtab: diff --git a/qubesadmin/app.py b/qubesadmin/app.py index 375be8c9..b4381b36 100644 --- a/qubesadmin/app.py +++ b/qubesadmin/app.py @@ -32,8 +32,11 @@ import sys import logging +import typing from logging import Logger -from typing import Literal +from subprocess import Popen +from typing import IO, TypeVar +from collections.abc import Generator, Iterable import qubesadmin.base import qubesadmin.exc @@ -41,62 +44,72 @@ import qubesadmin.storage import qubesadmin.utils import qubesadmin.vm +from qubesadmin.label import Label +from qubesadmin.vm import Klass, PowerState import qubesadmin.config import qubesadmin.device_protocol from qubesadmin.vm import QubesVM try: - import qubesdb + import qubesdb # type: ignore has_qubesdb = True except ImportError: has_qubesdb = False +# ["mic", "block", "pci", "usb", "webcam"] +# but can be extended +DeviceClass = str + class VMCollection: """Collection of VMs objects""" - def __init__(self, app): + def __init__(self, app: "QubesBase"): self.app = app - self._vm_list = None - self._vm_objects = {} + # TODO we should properly document what's in / the + # purpose of _vm_dict and _vm_objects + self._vm_dict: dict[str, dict[str, str]] = {} + self._vm_objects: dict[str, QubesVM] = {} + self._vm_dict_initialized: bool = False - def clear_cache(self, invalidate_name=None): + def clear_cache(self, invalidate_name: str | None=None) -> None: """Clear cached list of VMs If *invalidate_name* is given, remove that object from cache explicitly too. """ - self._vm_list = None + self._vm_dict_initialized = False + self._vm_dict = {} if invalidate_name: self._vm_objects.pop(invalidate_name, None) - def refresh_cache(self, force=False): + def refresh_cache(self, force: bool=False) -> None: """Refresh cached list of VMs""" - if not force and self._vm_list is not None: + if not force and self._vm_dict_initialized: return vm_list_data = self.app.qubesd_call("dom0", "admin.vm.List") - new_vm_list = {} + new_vm_dict = {} # FIXME: this will probably change for vm_data in vm_list_data.splitlines(): vm_name, props = vm_data.decode("ascii").split(" ", 1) vm_name = str(vm_name) props = props.split(" ") - new_vm_list[vm_name] = dict( + new_vm_dict[vm_name] = dict( [vm_prop.split("=", 1) for vm_prop in props] ) # if cache not enabled, drop power state if not self.app.cache_enabled: try: - del new_vm_list[vm_name]["state"] + del new_vm_dict[vm_name]["state"] except KeyError: pass - self._vm_list = new_vm_list + self._vm_dict = new_vm_dict for name, vm in list(self._vm_objects.items()): - if vm.name not in self._vm_list: + if vm.name not in self._vm_dict: # VM no longer exists del self._vm_objects[name] - elif vm.klass != self._vm_list[vm.name]["class"]: + elif vm.klass != self._vm_dict[vm.name]["class"]: # VM class have changed del self._vm_objects[name] # TODO: some generation ID, to detect VM re-creation @@ -104,6 +117,7 @@ def refresh_cache(self, force=False): # renamed self._vm_objects[vm.name] = vm del self._vm_objects[name] + self._vm_dict_initialized = True def __getitem__(self, item: str | QubesVM) -> QubesVM: if isinstance(item, QubesVM): @@ -118,21 +132,24 @@ def get_blind(self, item: str) -> QubesVM: and checking if exists """ if item not in self._vm_objects: - cls = qubesadmin.vm.QubesVM # provide class name to constructor, if already cached (which can be # done by 'item not in self' check above, unless blind_mode is # enabled - klass = None - power_state = None - if self._vm_list and item in self._vm_list: - klass = self._vm_list[item]["class"] - power_state = self._vm_list[item].get("state") - self._vm_objects[item] = cls( + klass: Klass | None = None + power_state: PowerState | None = None + if item in self._vm_dict: + klass = typing.cast(Klass | None, self._vm_dict[item]["class"]) + power_state = typing.cast(PowerState | None, + self._vm_dict[item].get("state")) + self._vm_objects[item] = QubesVM( self.app, item, klass=klass, power_state=power_state ) return self._vm_objects[item] - def get(self, item, default=None) -> QubesVM: + T = TypeVar("T") + + def get(self, item: str | QubesVM, default: QubesVM | T=None)\ + -> QubesVM | T: """ Get a VM object, or return *default* if it can't be found. """ @@ -141,30 +158,30 @@ def get(self, item, default=None) -> QubesVM: except KeyError: return default - def __contains__(self, item): + def __contains__(self, item: QubesVM | str) -> bool: if isinstance(item, qubesadmin.vm.QubesVM): item = item.name self.refresh_cache() - return item in self._vm_list + return item in self._vm_dict - def __delitem__(self, key): + def __delitem__(self, key: str) -> None: self.app.qubesd_call(key, "admin.vm.Remove") self.clear_cache() - def __iter__(self): + def __iter__(self) -> Generator[QubesVM, None, None]: self.refresh_cache() - for vm in sorted(self._vm_list): + for vm in sorted(self._vm_dict): yield self[vm] - def keys(self): + def keys(self) -> Iterable[str]: """Get list of VM names.""" self.refresh_cache() - return self._vm_list.keys() + return self._vm_dict.keys() - def values(self): + def values(self) -> list[QubesVM]: """Get list of VM objects.""" self.refresh_cache() - return [self[name] for name in self._vm_list] + return [self[name] for name in self._vm_dict] class QubesBase(qubesadmin.base.PropertyHolder): @@ -179,11 +196,11 @@ class in py:class:`qubesadmin.Qubes` instead, which points at #: domains (VMs) collection domains: VMCollection #: labels collection - labels: qubesadmin.base.WrapperObjectsCollection + labels: qubesadmin.base.WrapperObjectsCollection[Label] #: storage pools - pools: qubesadmin.base.WrapperObjectsCollection + pools: qubesadmin.base.WrapperObjectsCollection[qubesadmin.storage.Pool] #: type of qubesd connection: either 'socket' or 'qrexec' - qubesd_connection_type: Literal["socket", "qrexec"] + qubesd_connection_type: typing.Literal["socket", "qrexec"] #: logger log: Logger #: do not check for object (VM, label etc) existence before really needed @@ -191,7 +208,7 @@ class in py:class:`qubesadmin.Qubes` instead, which points at #: cache retrieved properties values cache_enabled: bool = False - def __init__(self): + def __init__(self) -> None: super().__init__(self, "admin.property.", "dom0") self.domains = VMCollection(self) self.labels = qubesadmin.base.WrapperObjectsCollection( @@ -201,27 +218,28 @@ def __init__(self): self, "admin.pool.List", qubesadmin.storage.Pool ) #: cache for available storage pool drivers and options to create them - self._pool_drivers = None + self._pool_drivers: dict[str, list[str]] | None = None self.log = logging.getLogger("app") self._local_name = None - def list_vmclass(self): + def list_vmclass(self) -> list[Klass]: """Call Qubesd in order to obtain the vm classes list""" vmclass = ( self.qubesd_call("dom0", "admin.vmclass.List").decode().splitlines() ) return sorted(vmclass) - def list_deviceclass(self): + def list_deviceclass(self) -> list[DeviceClass]: """Call Qubesd in order to obtain the device classes list""" deviceclasses = ( self.qubesd_call("dom0", "admin.deviceclass.List") .decode() .splitlines() ) + return sorted(deviceclasses) - def _refresh_pool_drivers(self): + def _refresh_pool_drivers(self) -> None: """ Refresh cached storage pool drivers and their parameters. @@ -241,17 +259,19 @@ def _refresh_pool_drivers(self): self._pool_drivers = pool_drivers @property - def pool_drivers(self): + def pool_drivers(self) -> Iterable[str]: """Available storage pool drivers""" self._refresh_pool_drivers() + assert self._pool_drivers is not None return self._pool_drivers.keys() - def pool_driver_parameters(self, driver): + def pool_driver_parameters(self, driver: str) -> list[str]: """Parameters to initialize storage pool using given driver""" self._refresh_pool_drivers() + assert self._pool_drivers is not None return self._pool_drivers[driver] - def add_pool(self, name, driver, **kwargs): + def add_pool(self, name: str, driver: str, **kwargs) -> None: """Add a storage pool to config :param name: name of storage pool to create @@ -269,12 +289,12 @@ def add_pool(self, name, driver, **kwargs): "dom0", "admin.pool.Add", driver, payload.encode("utf-8") ) - def remove_pool(self, name): + def remove_pool(self, name: str) -> None: """Remove a storage pool""" self.qubesd_call("dom0", "admin.pool.Remove", name, None) @property - def local_name(self): + def local_name(self) -> str: """Get localhost name""" if not self._local_name: local_name = None @@ -292,7 +312,7 @@ def local_name(self): return self._local_name - def get_label(self, label): + def get_label(self, label: str | int) -> Label: """Get label as identified by index or name :throws QubesLabelNotFoundError: when label is not found @@ -309,10 +329,10 @@ def get_label(self, label): for i in self.labels.values(): if i.index == int(label): return i - raise qubesadmin.exc.QubesLabelNotFoundError(label) + raise qubesadmin.exc.QubesLabelNotFoundError(str(label)) @staticmethod - def get_vm_class(clsname): + def get_vm_class(clsname: str) -> str: """Find the class for a domain. Compatibility function, client tools use str to identify domain classes. @@ -324,8 +344,10 @@ def get_vm_class(clsname): return clsname def add_new_vm( - self, cls, name, label, template=None, pool=None, pools=None - ): + self, cls: str | type[QubesVM], name: str, label: str, + template: str | QubesVM | None=None, pool: str | None=None, + pools: dict | None=None + ) -> QubesVM: """Create new Virtual Machine Example usage with custom storage pools: @@ -381,16 +403,16 @@ def add_new_vm( def clone_vm( self, - src_vm, - new_name, - new_cls=None, + src_vm: str | QubesVM, + new_name: str, + new_cls: str | None=None, *, - pool=None, - pools=None, - ignore_errors=False, - ignore_volumes=None, - ignore_devices=False, - ): + pool: str | None=None, + pools: dict | None=None, + ignore_errors: bool=False, + ignore_volumes: list | None=None, + ignore_devices: bool=False, + ) -> QubesVM: # pylint: disable=too-many-statements # pylint: disable=too-many-branches """Clone Virtual Machine @@ -626,8 +648,9 @@ def clone_vm( return dst_vm def qubesd_call( - self, dest, method, arg=None, payload=None, payload_stream=None - ): + self, dest: str | None, method: str, arg: str | None=None, + payload: bytes | None=None, payload_stream: IO | None=None + ) -> bytes: """ Execute Admin API method. @@ -650,16 +673,16 @@ def qubesd_call( def run_service( self, - dest, - service, - user=None, + dest: str, + service: str, + user: str | None=None, *, - filter_esc=False, - localcmd=None, - wait=True, - autostart=True, + filter_esc: bool=False, + localcmd: str | None=None, + wait: bool=True, + autostart: bool=True, **kwargs, - ): + ) -> Popen: """Run qrexec service in a given destination *kwargs* are passed verbatim to :py:meth:`subprocess.Popen`. @@ -681,7 +704,9 @@ def run_service( ) @staticmethod - def _call_with_stream(command, payload, payload_stream): + def _call_with_stream(command: str | list[str], payload: bytes | None, + payload_stream: IO)\ + -> tuple[Popen, bytes, bytes]: """Helper method to pass data to qubesd. Calls a command with payload and payload_stream as input. @@ -702,6 +727,7 @@ def _call_with_stream(command, payload, payload_stream): # because the process can get blocked on stdout or stderr pipe. # However, in practice the output should be always smaller # than 4K. + assert proc.stdin is not None proc.stdin.write(payload) try: shutil.copyfileobj(payload_stream, proc.stdin) @@ -714,7 +740,8 @@ def _call_with_stream(command, payload, payload_stream): stdout, stderr = proc.communicate() return proc, stdout, stderr - def _invalidate_cache(self, subject, event, name, **kwargs): + def _invalidate_cache(self, subject: QubesVM | None, + event: str, name: str, **kwargs) -> None: """Invalidate cached value of a property. This method is designed to be hooked as an event handler for: @@ -735,15 +762,18 @@ def _invalidate_cache(self, subject, event, name, **kwargs): :return: none """ # pylint: disable=unused-argument if subject is None: - subject = self + subject_or_self = self + else: + subject_or_self = subject try: # pylint: disable=protected-access - del subject._properties_cache[name] + del subject_or_self._properties_cache[name] except KeyError: pass - def _update_power_state_cache(self, subject, event, **kwargs): + def _update_power_state_cache(self, subject: QubesVM, + event: str, **kwargs) -> None: """Update cached VM power state. This method is designed to be hooked as an event handler for: @@ -785,7 +815,7 @@ def _update_power_state_cache(self, subject, event, **kwargs): # pylint: disable=protected-access subject._power_state_cache = power_state - def _invalidate_cache_all(self): + def _invalidate_cache_all(self) -> None: """Invalidate all cached data @@ -818,8 +848,9 @@ class QubesLocal(QubesBase): qubesd_connection_type = "socket" def qubesd_call( - self, dest, method, arg=None, payload=None, payload_stream=None - ): + self, dest: str | None, method: str, arg: str | None=None, + payload: bytes | None=None, payload_stream: IO | None=None + ) -> bytes: """ Execute Admin API method. @@ -847,13 +878,15 @@ def qubesd_call( raise qubesadmin.exc.QubesDaemonCommunicationError( "{} not found".format(method_path) ) + assert dest is not None command = [ "env", "QREXEC_REMOTE_DOMAIN=dom0", "QREXEC_REQUESTED_TARGET=" + dest, method_path, - arg, ] + if arg is not None: + command.append(arg) if os.getuid() != 0: command.insert(0, "sudo") (_, stdout, _) = self._call_with_stream( @@ -882,16 +915,16 @@ def qubesd_call( def run_service( self, - dest, - service, - user=None, + dest: str, + service: str, + user: str | None=None, *, - filter_esc=False, - localcmd=None, - wait=True, - autostart=True, + filter_esc: bool=False, + localcmd: str | None=None, + wait: bool=True, + autostart: bool=True, **kwargs, - ): + ) -> Popen: """Run qrexec service in a given destination :param str dest: Destination - may be a VM name or empty @@ -995,8 +1028,9 @@ class QubesRemote(QubesBase): qubesd_connection_type = "qrexec" def qubesd_call( - self, dest, method, arg=None, payload=None, payload_stream=None - ): + self, dest: str | None, method: str, arg: str | None=None, + payload: bytes | None=None, payload_stream: IO | None=None + ) -> bytes: """ Execute Admin API method. @@ -1013,6 +1047,7 @@ def qubesd_call( .. warning:: *payload_stream* will get closed by this function """ service_name = method + assert dest is not None if arg is not None: service_name += "+" + arg command = [qubesadmin.config.QREXEC_CLIENT_VM, dest, service_name] @@ -1037,16 +1072,16 @@ def qubesd_call( def run_service( self, - dest, - service, - user=None, + dest: str, + service: str, + user: str | None=None, *, - filter_esc=False, - localcmd=None, - wait=True, - autostart=True, + filter_esc: bool=False, + localcmd: str | None=None, + wait: bool=True, + autostart: bool=True, **kwargs, - ): + ) -> Popen: """Run qrexec service in a given destination :param str dest: Destination - may be a VM name or empty diff --git a/qubesadmin/tests/app.py b/qubesadmin/tests/app.py index 40cea0e1..254869ba 100644 --- a/qubesadmin/tests/app.py +++ b/qubesadmin/tests/app.py @@ -1026,7 +1026,8 @@ def _call_test_service_with_payload_stream( os.chmod(service_path, 0o755) with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR', - self.tmpdir): + self.tmpdir), \ + mock.patch('os.getuid', return_value=0): value = self.app.qubesd_call( 'test-vm', 'test.service', 'some-arg', payload=payload, payload_stream=payload_stream) diff --git a/qubesadmin/vm/__init__.py b/qubesadmin/vm/__init__.py index 4a93dfbd..72b7b760 100644 --- a/qubesadmin/vm/__init__.py +++ b/qubesadmin/vm/__init__.py @@ -19,35 +19,41 @@ # with this program; if not, see . """Qubes VM objects.""" - +from __future__ import annotations import logging import shlex import subprocess +import typing import warnings from logging import Logger +from typing import Literal, TypeVar -import qubesadmin.base import qubesadmin.exc import qubesadmin.storage import qubesadmin.features import qubesadmin.devices import qubesadmin.device_protocol import qubesadmin.firewall -import qubesadmin.tags + +if typing.TYPE_CHECKING: + import qubesadmin.base + + +# ["AppVM", "AdminVM", "TemplateVM", "DispVM", "StandaloneVM"] +# but can be extended +Klass = str +PowerState = Literal["Transient", "Running", "Halted", "Paused", +"Suspended", "Halting", "Dying", "Crashed", "NA"] class QubesVM(qubesadmin.base.PropertyHolder): """Qubes domain.""" log: Logger - tags: qubesadmin.tags.Tags - features: qubesadmin.features.Features - devices: qubesadmin.devices.DeviceManager - firewall: qubesadmin.firewall.Firewall def __init__(self, app, name, klass=None, power_state=None):