diff --git a/qubesadmin/backup/core3.py b/qubesadmin/backup/core3.py index bb540680..ce47d27f 100644 --- a/qubesadmin/backup/core3.py +++ b/qubesadmin/backup/core3.py @@ -28,12 +28,10 @@ import qubesadmin.backup import qubesadmin.firewall -from qubesadmin import device_protocol +from qubesadmin import utils from qubesadmin.vm import QubesVM - - class Core3VM(qubesadmin.backup.BackupVM): '''VM object''' @property @@ -138,7 +136,7 @@ def import_core3_vm(self, element: _Element) -> None: for opt_node in node.findall('./option'): opt_name = opt_node.get('name') options[opt_name] = opt_node.text - options['required'] = device_protocol.qbool( + options['required'] = utils.qbool( node.get('required', 'yes')) vm.devices[bus_name][(backend_domain, port_id)] = options diff --git a/qubesadmin/device_protocol.py b/qubesadmin/device_protocol.py index c7b60a54..6234266d 100644 --- a/qubesadmin/device_protocol.py +++ b/qubesadmin/device_protocol.py @@ -29,15 +29,20 @@ The same in `qubes-core-admin` and `qubes-core-admin-client`, should be moved to one place. """ - +from __future__ import annotations import string import sys from enum import Enum -from typing import Optional, Dict, Any, List, Union, Tuple, Callable +from typing import Any, TYPE_CHECKING +from collections.abc import Callable import qubesadmin.exc + from qubesadmin.exc import QubesValueError +if TYPE_CHECKING: + from qubesadmin.vm import QubesVM + from qubesadmin.app import VMCollection class ProtocolError(AssertionError): @@ -46,51 +51,27 @@ class ProtocolError(AssertionError): """ -QubesVM = 'qubesadmin.vm.QubesVM' - - class UnexpectedDeviceProperty(qubesadmin.exc.QubesException, ValueError): """ Device has unexpected property such as backend_domain, devclass etc. """ -def qbool(value): - """ - Property setter for boolean properties. - - It accepts (case-insensitive) ``'0'``, ``'no'`` and ``false`` as - :py:obj:`False` and ``'1'``, ``'yes'`` and ``'true'`` as - :py:obj:`True`. - """ - - if isinstance(value, str): - lcvalue = value.lower() - if lcvalue in ("0", "no", "false", "off"): - return False - if lcvalue in ("1", "yes", "true", "on"): - return True - raise QubesValueError( - "Invalid literal for boolean property: {!r}".format(value) - ) - - return bool(value) - - class DeviceSerializer: """ Group of method for serialization of device properties. """ - ALLOWED_CHARS_KEY = set( + ALLOWED_CHARS_KEY: set[str] = set( string.digits + string.ascii_letters + r"!#$%&()*+,-./:;<>?@[\]^_{|}~" ) - ALLOWED_CHARS_PARAM = ALLOWED_CHARS_KEY.union(set(string.punctuation + " ")) + ALLOWED_CHARS_PARAM: set[str]\ + = ALLOWED_CHARS_KEY.union(set(string.punctuation + " ")) @classmethod def unpack_properties( cls, untrusted_serialization: bytes - ) -> Tuple[Dict, Dict]: + ) -> tuple[dict, dict]: """ Unpacks basic port properties from a serialized encoded string. @@ -106,8 +87,8 @@ def unpack_properties( "ascii", errors="strict" ).strip() - properties: Dict[str, str] = {} - options: Dict[str, str] = {} + properties: dict[str, str] = {} + options: dict[str, str] = {} if not ut_decoded: return properties, options @@ -155,7 +136,7 @@ def unpack_properties( return properties, options @classmethod - def pack_property(cls, key: str, value: Optional[str]): + def pack_property(cls, key: str, value: object) -> bytes: """ Add property `key=value` to serialization. """ @@ -175,8 +156,8 @@ def pack_property(cls, key: str, value: Optional[str]): @staticmethod def parse_basic_device_properties( - expected_device: "VirtualDevice", properties: Dict[str, Any] - ): + expected_device: VirtualDevice, properties: dict + ) -> None: """ Validates properties against an expected port configuration. @@ -228,7 +209,7 @@ def parse_basic_device_properties( properties["port"] = expected @staticmethod - def serialize_str(value: str) -> str: + def serialize_str(value: object) -> str: """ Serialize python string to ensure consistency. """ @@ -245,7 +226,7 @@ def deserialize_str(value: str) -> str: def sanitize_str( untrusted_value: str, allowed_chars: set, - replace_char: Optional[str] = None, + replace_char: str | None = None, error_message: str = "", ) -> str: """ @@ -281,18 +262,18 @@ class Port: def __init__( self, - backend_domain: Optional[QubesVM], - port_id: Optional[str], - devclass: Optional[str], + backend_domain: QubesVM | None, + port_id: str | None, + devclass: str | None, ): self.__backend_domain = backend_domain self.__port_id = port_id self.__devclass = devclass - def __hash__(self): + def __hash__(self) -> int: return hash((self.backend_name, self.port_id, self.devclass)) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if isinstance(other, Port): return ( self.backend_name == other.backend_name @@ -301,7 +282,7 @@ def __eq__(self, other): ) return False - def __lt__(self, other): + def __lt__(self, other: object) -> bool: if isinstance(other, Port): return (self.backend_name, self.devclass, self.port_id) < ( other.backend_name, @@ -313,10 +294,10 @@ def __lt__(self, other): "is not supported" ) - def __repr__(self): + def __repr__(self) -> str: return f"{self.backend_name}+{self.port_id}" - def __str__(self): + def __str__(self) -> str: return f"{self.backend_name}:{self.port_id}" @property @@ -328,8 +309,9 @@ def backend_name(self) -> str: @classmethod def from_qarg( - cls, representation: str, devclass, domains, blind=False - ) -> "Port": + cls, representation: str, devclass: str, + domains: VMCollection, blind: bool=False + ) -> Port: """ Parse qrexec argument + to retrieve Port. """ @@ -341,8 +323,9 @@ def from_qarg( @classmethod def from_str( - cls, representation: str, devclass, domains, blind=False - ) -> "Port": + cls, representation: str, devclass: str, + domains: VMCollection, blind: bool=False + ) -> Port: """ Parse string : to retrieve Port. """ @@ -355,7 +338,7 @@ def from_str( @classmethod def _parse( cls, representation: str, devclass: str, get_domain: Callable, sep: str - ) -> "Port": + ) -> Port: """ Parse string representation and return instance of Port. """ @@ -375,7 +358,7 @@ def port_id(self) -> str: return "*" @property - def backend_domain(self) -> Optional[QubesVM]: + def backend_domain(self) -> QubesVM | None: """Which domain exposed this port. (immutable)""" return self.__backend_domain @@ -390,7 +373,7 @@ def devclass(self) -> str: return "peripheral" @property - def has_devclass(self): + def has_devclass(self) -> bool: """Returns True if devclass is set.""" return self.__devclass is not None @@ -401,10 +384,10 @@ class AnyPort(Port): def __init__(self, devclass: str): super().__init__(None, "*", devclass) - def __repr__(self): + def __repr__(self) -> str: return "*" - def __str__(self): + def __str__(self) -> str: return "*" @@ -419,18 +402,18 @@ class VirtualDevice: def __init__( self, - port: Optional[Port] = None, - device_id: Optional[str] = None, + port: Port | None = None, + device_id: str | None = None, ): assert not isinstance(port, AnyPort) or device_id is not None - self.port: Optional[Port] = port # type: ignore + self.port: Port | None = port self._device_id = device_id - def clone(self, **kwargs) -> "VirtualDevice": + def clone(self, **kwargs) -> VirtualDevice: """ Clone object and substitute attributes with explicitly given. """ - attr: Dict[str, Any] = { + attr: dict[str, Any] = { # noqa:ANN401 "port": self.port, "device_id": self.device_id, } @@ -443,7 +426,7 @@ def port(self) -> Port: return self._port @port.setter - def port(self, value: Union[Port, str, None]): + def port(self, value: Port | str | None) -> None: # pylint: disable=missing-function-docstring if isinstance(value, Port): self._port = value @@ -469,7 +452,7 @@ def is_device_id_set(self) -> bool: return self._device_id is not None @property - def backend_domain(self) -> Optional[QubesVM]: + def backend_domain(self) -> QubesVM | None: # pylint: disable=missing-function-docstring return self.port.backend_domain @@ -499,10 +482,10 @@ def description(self) -> str: return "any device" return self.device_id - def __hash__(self): + def __hash__(self) -> int: return hash((self.port, self.device_id)) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if isinstance(other, (VirtualDevice, DeviceAssignment)): result = ( self.port == other.port and self.device_id == other.device_id @@ -512,7 +495,7 @@ def __eq__(self, other): return self.port == other and self.device_id == "*" return super().__eq__(other) - def __lt__(self, other): + def __lt__(self, other: object) -> bool: """ Desired order (important for auto-attachment): @@ -543,11 +526,11 @@ def __lt__(self, other): "is not supported" ) - def __repr__(self): + def __repr__(self) -> str: return f"{self.port!r}:{self.device_id}" @property - def repr_for_qarg(self): + def repr_for_qarg(self) -> str: """Object representation for qrexec argument""" res = repr(self).replace(":", "+") # replace '?' in category @@ -555,18 +538,18 @@ def repr_for_qarg(self): res = res.replace(unknown_dev, "_" * len(unknown_dev)) return res.replace("*", "_") - def __str__(self): + def __str__(self) -> str: return f"{self.port}:{self.device_id}" @classmethod def from_qarg( cls, representation: str, - devclass: Optional[str], - domains, + devclass: str | None, + domains: VMCollection, blind: bool = False, - backend: Optional[QubesVM] = None, - ) -> "VirtualDevice": + backend: QubesVM | None = None, + ) -> VirtualDevice: """ Parse qrexec argument +: to get device info """ @@ -583,15 +566,16 @@ def from_qarg( def from_str( cls, representation: str, - devclass: Optional[str], - domains, + devclass: str | None, + domains: VMCollection | None, blind: bool = False, - backend: Optional[QubesVM] = None, - ) -> "VirtualDevice": + backend: QubesVM | None = None, + ) -> VirtualDevice: """ Parse string +: to get device info """ if backend is None: + assert domains is not None if blind: get_domain = domains.get_blind else: @@ -604,15 +588,16 @@ def from_str( def _parse( cls, representation: str, - devclass: Optional[str], - get_domain: Callable, - backend: Optional[QubesVM], + devclass: str | None, + get_domain: Callable | None, + backend: QubesVM | None, sep: str, - ) -> "VirtualDevice": + ) -> VirtualDevice: """ Parse string representation and return instance of VirtualDevice. """ if backend is None: + assert get_domain is not None backend_name, identity = representation.split(sep, 1) if backend_name == "_": backend_name = "*" @@ -690,7 +675,7 @@ class DeviceCategory(Enum): PCI_USB = ("p0c03**",) @staticmethod - def from_str(interface_encoding: str) -> "DeviceCategory": + def from_str(interface_encoding: str) -> DeviceCategory: """ Returns `DeviceCategory` from data encoded in string. """ @@ -721,7 +706,7 @@ class DeviceInterface: Peripheral device interface wrapper. """ - def __init__(self, interface_encoding: str, devclass: Optional[str] = None): + def __init__(self, interface_encoding: str, devclass: str | None = None): ifc_padded = interface_encoding.ljust(6, "*") if devclass: if len(ifc_padded) > 6: @@ -762,7 +747,7 @@ def __init__(self, interface_encoding: str, devclass: Optional[str] = None): self._category = DeviceCategory.from_str(self._interface_encoding) @property - def devclass(self) -> Optional[str]: + def devclass(self) -> str | None: """Immutable Device class such like: 'usb', 'pci' etc.""" return self._devclass @@ -772,12 +757,12 @@ def category(self) -> DeviceCategory: return self._category @classmethod - def unknown(cls) -> "DeviceInterface": + def unknown(cls) -> DeviceInterface: """Value for unknown device interface.""" return cls("?******") @staticmethod - def from_str_bulk(interfaces: Optional[str]) -> List["DeviceInterface"]: + def from_str_bulk(interfaces: str | None) -> list[DeviceInterface]: """Interprets string of interfaces as list of `DeviceInterface`. Examples: @@ -796,18 +781,18 @@ def from_str_bulk(interfaces: Optional[str]) -> List["DeviceInterface"]: for i in range(0, len(interfaces), 7) ] - def __repr__(self): + def __repr__(self) -> str: return self._interface_encoding - def __hash__(self): + def __hash__(self) -> int: return hash(repr(self)) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if not isinstance(other, DeviceInterface): return False return repr(self) == repr(other) - def __str__(self): + def __str__(self) -> str: if self.devclass == "block": return "Block Device" if self.devclass in ("usb", "pci"): @@ -848,7 +833,7 @@ def __str__(self): return repr(self) @staticmethod - def _load_classes(bus: str): + def _load_classes(bus: str) -> dict: """ List of known device classes, subclasses and programming interfaces. """ @@ -884,7 +869,7 @@ def _load_classes(bus: str): return result - def matches(self, other: "DeviceInterface") -> bool: + def matches(self, other: DeviceInterface) -> bool: """ Check if this `DeviceInterface` (pattern) matches given one. @@ -913,15 +898,15 @@ def __init__( self, port: Port, *, - vendor: Optional[str] = None, - product: Optional[str] = None, - manufacturer: Optional[str] = None, - name: Optional[str] = None, - serial: Optional[str] = None, - interfaces: Optional[List[DeviceInterface]] = None, - parent: Optional["DeviceInfo"] = None, - attachment: Optional[QubesVM] = None, - device_id: Optional[str] = None, + vendor: str | None = None, + product: str | None = None, + manufacturer: str | None = None, + name: str | None = None, + serial: str | None = None, + interfaces: list[DeviceInterface] | None = None, + parent: DeviceInfo | None = None, + attachment: QubesVM | None = None, + device_id: str | None = None, **kwargs, ): super().__init__(port, device_id) @@ -1044,7 +1029,7 @@ def description(self) -> str: return f"{cat}: {vendor} {prod}" @property - def interfaces(self) -> List[DeviceInterface]: + def interfaces(self) -> list[DeviceInterface]: """ Non-empty list of device interfaces. @@ -1055,7 +1040,7 @@ def interfaces(self) -> List[DeviceInterface]: return self._interfaces @property - def parent_device(self) -> Optional[VirtualDevice]: + def parent_device(self) -> VirtualDevice | None: """ The parent device, if any. @@ -1065,7 +1050,7 @@ def parent_device(self) -> Optional[VirtualDevice]: return self._parent @property - def subdevices(self) -> List[VirtualDevice]: + def subdevices(self) -> list[VirtualDevice]: """ The list of children devices if any. @@ -1082,7 +1067,7 @@ def subdevices(self) -> List[VirtualDevice]: ] @property - def attachment(self) -> Optional[QubesVM]: + def attachment(self) -> QubesVM | None: """ VM to which device is attached (frontend domain). """ @@ -1135,8 +1120,8 @@ def deserialize( cls, serialization: bytes, expected_backend_domain: QubesVM, - expected_devclass: Optional[str] = None, - ) -> "DeviceInfo": + expected_devclass: str | None = None, + ) -> DeviceInfo: """ Recovers a serialized object, see: :py:meth:`serialize`. """ @@ -1160,7 +1145,7 @@ def deserialize( @classmethod def _deserialize( cls, untrusted_serialization: bytes, expected_device: VirtualDevice - ) -> "DeviceInfo": + ) -> DeviceInfo: """ Actually deserializes the object. """ @@ -1217,7 +1202,7 @@ def device_id(self) -> str: return self._device_id @device_id.setter - def device_id(self, value): + def device_id(self, value: str) -> None: # Do not auto-override value like in super class self._device_id = value @@ -1226,7 +1211,7 @@ class UnknownDevice(DeviceInfo): """Unknown device - for example, exposed by domain not running currently""" @staticmethod - def from_device(device: VirtualDevice) -> "UnknownDevice": + def from_device(device: VirtualDevice) -> UnknownDevice: """ Return `UnknownDevice` based on any virtual device. """ @@ -1252,9 +1237,9 @@ class DeviceAssignment: def __init__( self, device: VirtualDevice, - frontend_domain=None, - options=None, - mode: Union[str, AssignmentMode] = "manual", + frontend_domain: QubesVM | None=None, + options: dict[str, object] | None=None, + mode: str | AssignmentMode = AssignmentMode.MANUAL, ): if isinstance(device, DeviceInfo): device = VirtualDevice(device.port, device.device_id) @@ -1272,12 +1257,12 @@ def new( backend_domain: QubesVM, port_id: str, devclass: str, - device_id: Optional[str] = None, + device_id: str | None = None, *, - frontend_domain: Optional[QubesVM] = None, - options=None, - mode: Union[str, AssignmentMode] = "manual", - ) -> "DeviceAssignment": + frontend_domain: QubesVM | None = None, + options: dict[str, object] | None=None, + mode: str | AssignmentMode = AssignmentMode.MANUAL, + ) -> DeviceAssignment: """Helper method to create a DeviceAssignment object.""" return cls( VirtualDevice(Port(backend_domain, port_id, devclass), device_id), @@ -1286,7 +1271,7 @@ def new( mode, ) - def clone(self, **kwargs): + def clone(self, **kwargs) -> DeviceAssignment: """ Clone object and substitute attributes with explicitly given. """ @@ -1297,23 +1282,23 @@ def clone(self, **kwargs): "frontend_domain": self.frontend_domain, } attr.update(kwargs) - return self.__class__(**attr) + return self.__class__(**attr) # type: ignore - def __repr__(self): + def __repr__(self) -> str: return f"{self.virtual_device!r}" @property - def repr_for_qarg(self): + def repr_for_qarg(self) -> str: """Object representation for qrexec argument""" return self.virtual_device.repr_for_qarg - def __str__(self): + def __str__(self) -> str: return f"{self.virtual_device}" - def __hash__(self): + def __hash__(self) -> int: return hash(self.virtual_device) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if isinstance(other, (VirtualDevice, DeviceAssignment)): result = ( self.port == other.port and self.device_id == other.device_id @@ -1321,7 +1306,7 @@ def __eq__(self, other): return result return False - def __lt__(self, other): + def __lt__(self, other: object) -> bool: if isinstance(other, DeviceAssignment): return self.virtual_device < other.virtual_device if isinstance(other, VirtualDevice): @@ -1332,7 +1317,7 @@ def __lt__(self, other): ) @property - def backend_domain(self) -> Optional[QubesVM]: + def backend_domain(self) -> QubesVM | None: # pylint: disable=missing-function-docstring return self.virtual_device.backend_domain @@ -1357,9 +1342,9 @@ def device_id(self) -> str: return self.virtual_device.device_id @property - def devices(self) -> List[DeviceInfo]: + def devices(self) -> list[DeviceInfo]: """Get DeviceInfo objects corresponding to this DeviceAssignment""" - result: List[DeviceInfo] = [] + result: list[DeviceInfo] = [] if not self.backend_domain: return result if self.port_id != "*": @@ -1399,17 +1384,17 @@ def port(self) -> Port: return Port(self.backend_domain, self.port_id, self.devclass) @property - def frontend_domain(self) -> Optional[QubesVM]: + def frontend_domain(self) -> QubesVM | None: """Which domain the device is attached/assigned to.""" return self.__frontend_domain @frontend_domain.setter - def frontend_domain(self, frontend_domain: Optional[Union[str, QubesVM]]): + def frontend_domain(self, frontend_domain: str | QubesVM | None) -> None: """Which domain the device is attached/assigned to.""" if isinstance(frontend_domain, str): if not self.backend_domain: raise ProtocolError("Cannot determine backend domain") - self.__frontend_domain: Optional[QubesVM] = ( + self.__frontend_domain: QubesVM | None = ( self.backend_domain.app.domains[frontend_domain] ) else: @@ -1448,12 +1433,12 @@ def attach_automatically(self) -> bool: ) @property - def options(self) -> Dict[str, Any]: + def options(self) -> dict[str, object]: """Device options (same as in the legacy API).""" return self.__options @options.setter - def options(self, options: Optional[Dict[str, Any]]): + def options(self, options: dict[str, object] | None) -> None: """Device options (same as in the legacy API).""" self.__options = options or {} @@ -1482,7 +1467,7 @@ def deserialize( cls, serialization: bytes, expected_device: VirtualDevice, - ) -> "DeviceAssignment": + ) -> DeviceAssignment: """ Recovers a serialized object, see: :py:meth:`serialize`. """ @@ -1497,7 +1482,7 @@ def _deserialize( cls, untrusted_serialization: bytes, expected_device: VirtualDevice, - ) -> "DeviceAssignment": + ) -> DeviceAssignment: """ Actually deserializes the object. """ diff --git a/qubesadmin/utils.py b/qubesadmin/utils.py index 68b89af5..9f6d5982 100644 --- a/qubesadmin/utils.py +++ b/qubesadmin/utils.py @@ -29,6 +29,7 @@ import re import qubesadmin.exc +from qubesadmin.exc import QubesValueError def parse_size(size): @@ -210,3 +211,25 @@ def release(self): """Unlock the file and close the file object""" fcntl.lockf(self.file, fcntl.LOCK_UN) self.file.close() + + +def qbool(value: str | int | bool) -> bool: + """ + Property setter for boolean properties. + + It accepts (case-insensitive) ``'0'``, ``'no'`` and ``false`` as + :py:obj:`False` and ``'1'``, ``'yes'`` and ``'true'`` as + :py:obj:`True`. + """ + + if isinstance(value, str): + lcvalue = value.lower() + if lcvalue in ("0", "no", "false", "off"): + return False + if lcvalue in ("1", "yes", "true", "on"): + return True + raise QubesValueError( + "Invalid literal for boolean property: {!r}".format(value) + ) + + return bool(value)