diff --git a/qubesadmin/label.py b/qubesadmin/label.py index c615bff3..9aa28cea 100644 --- a/qubesadmin/label.py +++ b/qubesadmin/label.py @@ -19,9 +19,14 @@ # with this program; if not, see . '''VM Labels''' +from __future__ import annotations +from typing import TYPE_CHECKING import qubesadmin.exc +if TYPE_CHECKING: + from qubesadmin.app import QubesBase + class Label: '''Label definition for virtual machines @@ -32,14 +37,14 @@ class Label: :param str name: label's name like "red" or "green" ''' - def __init__(self, app, name): + def __init__(self, app: QubesBase, name: str): self.app = app self._name = name - self._color = None - self._index = None + self._color: str | None = None + self._index: int | None = None @property - def color(self): + def color(self) -> str: '''color specification as in HTML (``#abcdef``)''' if self._color is None: try: @@ -51,18 +56,18 @@ def color(self): return self._color @property - def name(self): + def name(self) -> str: '''label's name like "red" or "green"''' return self._name @property - def icon(self): + def icon(self) -> str: '''freedesktop icon name, suitable for use in :py:meth:`PyQt4.QtGui.QIcon.fromTheme`''' return 'appvm-' + self.name @property - def index(self): + def index(self) -> int: '''label numeric identifier''' if self._index is None: try: @@ -73,13 +78,13 @@ def index(self): self._index = int(qubesd_response.decode()) return self._index - def __str__(self): + def __str__(self) -> str: return self._name - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if isinstance(other, Label): return self.name == other.name return NotImplemented - def __hash__(self): + def __hash__(self) -> int: return hash(self.name) diff --git a/qubesadmin/log.py b/qubesadmin/log.py index a06a2127..964ca0b7 100644 --- a/qubesadmin/log.py +++ b/qubesadmin/log.py @@ -37,7 +37,7 @@ formatter_debug = logging.Formatter(FORMAT_DEBUG) -def enable(): +def enable() -> None: '''Enable global logging Use :py:mod:`logging` module from standard library to log messages. @@ -58,7 +58,7 @@ def enable(): logging.root.setLevel(logging.INFO) -def enable_debug(): +def enable_debug() -> None: '''Enable debug logging Enable more messages and additional info to message format. diff --git a/qubesadmin/spinner.py b/qubesadmin/spinner.py index 27c4eb9a..5b26de70 100644 --- a/qubesadmin/spinner.py +++ b/qubesadmin/spinner.py @@ -39,9 +39,11 @@ import curses import io import itertools +import typing +from typing import IO -CHARSET = '-\\|/' -ENTERPRISE_CHARSET = CHARSET * 4 + '-._.-^' * 2 +CHARSET: str = '-\\|/' +ENTERPRISE_CHARSET: str = CHARSET * 4 + '-._.-^' * 2 class AbstractSpinner: '''The base class for all Spinners @@ -54,35 +56,35 @@ class AbstractSpinner: 2. zero or more calls to :py:meth:`update()` 3. exactly one call to :py:meth:`hide()` ''' - def __init__(self, stream, charset=CHARSET): + def __init__(self, stream: IO, charset: str=CHARSET): self.stream = stream self.charset = itertools.cycle(charset) - def show(self, prompt): + def show(self, prompt: str) -> None: '''Show the spinner, with a prompt :param str prompt: prompt, like "please wait" ''' raise NotImplementedError() - def hide(self): + def hide(self) -> None: '''Hide the spinner and the prompt''' raise NotImplementedError() - def update(self): + def update(self) -> None: '''Show next spinner character''' raise NotImplementedError() class DummySpinner(AbstractSpinner): '''Dummy spinner, does not do anything''' - def show(self, prompt): + def show(self, prompt: str) -> None: pass - def hide(self): + def hide(self) -> None: pass - def update(self): + def update(self) -> None: pass @@ -90,21 +92,21 @@ class QubesSpinner(AbstractSpinner): '''Basic spinner This spinner uses standard ASCII control characters''' - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.hidelen = 0 self.cub1 = '\b' - def show(self, prompt): + def show(self, prompt: str) -> None: self.hidelen = len(prompt) + 2 self.stream.write('{} {}'.format(prompt, next(self.charset))) self.stream.flush() - def hide(self): + def hide(self) -> None: self.stream.write('\r' + ' ' * self.hidelen + '\r') self.stream.flush() - def update(self): + def update(self) -> None: self.stream.write(self.cub1 + next(self.charset)) self.stream.flush() @@ -114,7 +116,7 @@ class QubesSpinnerEnterpriseEdition(QubesSpinner): This is tty- and terminfo-aware spinner. Recommended. ''' - def __init__(self, stream, charset=None): + def __init__(self, stream: IO, charset: str | None=None): # our Enterprise logic follows self.stream_isatty = stream.isatty() if charset is None: @@ -126,18 +128,20 @@ def __init__(self, stream, charset=None): try: curses.setupterm() self.has_terminfo = True - self.cub1 = curses.tigetstr('cub1').decode() + self.cub1 = typing.cast(bytes, curses.tigetstr('cub1')).decode() except (curses.error, io.UnsupportedOperation): # we are in very non-Enterprise environment self.has_terminfo = False else: self.cub1 = '' - def hide(self): + def hide(self) -> None: if self.stream_isatty: hideseq = '\r' + ' ' * self.hidelen + '\r' if self.has_terminfo: - hideseq_l = (curses.tigetstr('cr'), curses.tigetstr('clr_eol')) + hideseq_l = typing.cast( + tuple[bytes, bytes], + (curses.tigetstr('cr'), curses.tigetstr('clr_eol'))) if all(seq is not None for seq in hideseq_l): hideseq = ''.join(seq.decode() for seq in hideseq_l) else: diff --git a/qubesadmin/storage.py b/qubesadmin/storage.py index e7924b32..266d2271 100644 --- a/qubesadmin/storage.py +++ b/qubesadmin/storage.py @@ -19,12 +19,20 @@ # with this program; if not, see . """Storage subsystem.""" +from __future__ import annotations +from typing import BinaryIO, TYPE_CHECKING, IO +from collections.abc import Generator + import qubesadmin.exc +if TYPE_CHECKING: + from qubesadmin.app import QubesBase class Volume: """Storage volume.""" - def __init__(self, app, pool=None, vid=None, vm=None, vm_name=None): + def __init__(self, app: QubesBase, pool: str | None=None, + vid: str | None=None, vm: str | None=None, + vm_name: str | None=None): """Construct a Volume object. Volume may be identified using pool+vid, or vm+vm_name. Either of @@ -49,7 +57,8 @@ def __init__(self, app, pool=None, vid=None, vm=None, vm_name=None): self._vm_name = vm_name self._info = None - def _qubesd_call(self, func_name, payload=None, payload_stream=None): + def _qubesd_call(self, func_name: str, payload: bytes | None = None, + payload_stream: IO | None = None) -> bytes: """Make a call to qubesd regarding this volume :param str func_name: API function name, like `Info` or `Resize` @@ -69,6 +78,7 @@ def _qubesd_call(self, func_name, payload=None, payload_stream=None): method = 'admin.pool.volume.' + func_name dest = 'dom0' arg = self._pool + assert self._vid is not None if payload is not None: payload = self._vid.encode('ascii') + b' ' + payload else: @@ -77,7 +87,7 @@ def _qubesd_call(self, func_name, payload=None, payload_stream=None): dest, method, arg, payload=payload, payload_stream=payload_stream) - def _fetch_info(self, force=True): + def _fetch_info(self, force: bool = True) -> None: """Fetch volume properties Populate self._info dict @@ -90,27 +100,29 @@ def _fetch_info(self, force=True): info = info.decode('ascii') self._info = dict([line.split('=', 1) for line in info.splitlines()]) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if isinstance(other, Volume): return self.pool == other.pool and self.vid == other.vid return NotImplemented - def __lt__(self, other): + def __lt__(self, other: object) -> bool: # pylint: disable=protected-access if isinstance(other, Volume): if self._vm and other._vm: + assert self._vm_name is not None and other._vm_name is not None return (self._vm, self._vm_name) < (other._vm, other._vm_name) if self._vid and other._vid: + assert self._pool is not None and other._pool is not None return (self._pool, self._vid) < (other._pool, other._vid) return NotImplemented @property - def name(self): + def name(self) -> str | None: """per-VM volume name, if available""" return self._vm_name @property - def pool(self): + def pool(self) -> str: """Storage volume pool name.""" if self._pool is not None: return self._pool @@ -118,10 +130,11 @@ def pool(self): self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('pool') + assert self._info is not None return str(self._info['pool']) @property - def vid(self): + def vid(self) -> str: """Storage volume id, unique within given pool.""" if self._vid is not None: return self._vid @@ -129,76 +142,83 @@ def vid(self): self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('vid') + assert self._info is not None return str(self._info['vid']) @property - def size(self): + def size(self) -> int: """Size of volume, in bytes.""" try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('size') + assert self._info is not None return int(self._info['size']) @property - def usage(self): + def usage(self) -> int: """Used volume space, in bytes.""" try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('usage') + assert self._info is not None return int(self._info['usage']) @property - def rw(self): + def rw(self) -> bool: """True if volume is read-write.""" try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('rw') + assert self._info is not None return self._info['rw'] == 'True' @rw.setter - def rw(self, value): + def rw(self, value: object) -> None: """Set rw property""" self._qubesd_call('Set.rw', str(value).encode('ascii')) self._info = None @property - def ephemeral(self): + def ephemeral(self) -> bool: """True if volume is read-write.""" try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('ephemeral') + assert self._info is not None return self._info.get('ephemeral', 'False') == 'True' @ephemeral.setter - def ephemeral(self, value): + def ephemeral(self, value: object) -> None: """Set rw property""" self._qubesd_call('Set.ephemeral', str(value).encode('ascii')) self._info = None @property - def snap_on_start(self): + def snap_on_start(self) -> bool: """Create a snapshot from source on VM start.""" try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('snap_on_start') + assert self._info is not None return self._info['snap_on_start'] == 'True' @property - def save_on_stop(self): + def save_on_stop(self) -> bool: """Commit changes to original volume on VM stop.""" try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('save_on_stop') + assert self._info is not None return self._info['save_on_stop'] == 'True' @property - def source(self): + def source(self) -> str | None: """Volume ID of source volume (for :py:attr:`snap_on_start`). If None, this volume itself will be used. @@ -207,26 +227,28 @@ def source(self): self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('source') + assert self._info is not None if self._info['source']: return self._info['source'] return None @property - def revisions_to_keep(self): + def revisions_to_keep(self) -> int: """Number of revisions to keep around""" try: self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('revisions_to_keep') + assert self._info is not None return int(self._info['revisions_to_keep']) @revisions_to_keep.setter - def revisions_to_keep(self, value): + def revisions_to_keep(self, value: object) -> None: """Set revisions_to_keep property""" self._qubesd_call('Set.revisions_to_keep', str(value).encode('ascii')) self._info = None - def is_outdated(self): + def is_outdated(self) -> bool: """Returns `True` if this snapshot of a source volume (for `snap_on_start`=True) is outdated. """ @@ -234,9 +256,10 @@ def is_outdated(self): self._fetch_info() except qubesadmin.exc.QubesDaemonAccessError: raise qubesadmin.exc.QubesPropertyAccessError('is_outdated') + assert self._info is not None return self._info.get('is_outdated', False) == 'True' - def resize(self, size): + def resize(self, size: object) -> None: """Resize volume. Currently only extending is supported. @@ -246,12 +269,12 @@ def resize(self, size): self._qubesd_call('Resize', str(size).encode('ascii')) @property - def revisions(self): + def revisions(self) -> list[str]: """ Returns iterable containing revision identifiers""" revisions = self._qubesd_call('ListSnapshots') return revisions.decode('ascii').splitlines() - def revert(self, revision): + def revert(self, revision: str) -> None: """ Revert volume to previous revision :param str revision: Revision identifier to revert to @@ -260,7 +283,7 @@ def revert(self, revision): raise TypeError('revision must be a str') self._qubesd_call('Revert', revision.encode('ascii')) - def import_data(self, stream): + def import_data(self, stream: BinaryIO) -> None: """ Import volume data from a given file-like object. This function overrides existing volume content. @@ -269,7 +292,7 @@ def import_data(self, stream): """ self._qubesd_call('Import', payload_stream=stream) - def import_data_with_size(self, stream, size): + def import_data_with_size(self, stream: IO, size: object) -> None: """ Import volume data from a given file-like object, informing qubesd that data has a specific size. @@ -283,11 +306,11 @@ def import_data_with_size(self, stream, size): 'ImportWithSize', payload=size_line.encode(), payload_stream=stream) - def clear_data(self): + def clear_data(self) -> None: """ Clear existing volume content. """ self._qubesd_call('Clear') - def clone(self, source): + def clone(self, source: Volume) -> None: """ Clone data from sane volume of another VM. This function override existing volume content. @@ -309,7 +332,7 @@ class Pool: """ A Pool is used to manage different kind of volumes (File based/LVM/Btrfs/...). """ - def __init__(self, app, name=None): + def __init__(self, app: QubesBase, name: str | None=None): """ Initialize storage pool wrapper :param app: Qubes() object @@ -319,23 +342,25 @@ def __init__(self, app, name=None): self.name = name self._config = None - def __str__(self): + def __str__(self) -> str: + assert self.name is not None return self.name - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if isinstance(other, Pool): return self.name == other.name if isinstance(other, str): return self.name == other return NotImplemented - def __lt__(self, other): + def __lt__(self, other: object) -> bool: if isinstance(other, Pool): + assert self.name is not None and other.name is not None return self.name < other.name return NotImplemented @property - def usage_details(self): + def usage_details(self) -> dict[str, int]: """ Storage pool usage details (current - not cached) """ try: pool_usage_data = self.app.qubesd_call( @@ -346,14 +371,14 @@ def usage_details(self): assert pool_usage_data.endswith('\n') or pool_usage_data == '' pool_usage_data = pool_usage_data[:-1] - def _int_split(text): # pylint: disable=missing-docstring + def _int_split(text: str) -> tuple[str, int]: # pylint: disable=missing-docstring key, value = text.split("=", 1) return key, int(value) return dict(_int_split(l) for l in pool_usage_data.splitlines()) @property - def config(self): + def config(self) -> dict[str, str]: """ Storage pool config """ if self._config is None: try: @@ -369,7 +394,7 @@ def config(self): return self._config @property - def size(self): + def size(self) -> int | None: """ Storage pool size, in bytes""" try: return int(self.usage_details['data_size']) @@ -378,7 +403,7 @@ def size(self): return None @property - def usage(self): + def usage(self) -> int | None: """ Space used in the pool, in bytes """ try: return int(self.usage_details['data_usage']) @@ -387,17 +412,17 @@ def usage(self): return None @property - def driver(self): + def driver(self) -> str: """ Storage pool driver """ return self.config['driver'] @property - def revisions_to_keep(self): + def revisions_to_keep(self) -> int: """Number of revisions to keep around""" return int(self.config['revisions_to_keep']) @revisions_to_keep.setter - def revisions_to_keep(self, value): + def revisions_to_keep(self, value: object) -> None: """Set revisions_to_keep property""" self.app.qubesd_call( 'dom0', @@ -407,13 +432,13 @@ def revisions_to_keep(self, value): self._config = None @property - def ephemeral_volatile(self): + def ephemeral_volatile(self) -> bool: """Whether volatile volumes in this pool should be encrypted with an ephemeral key in dom0""" return bool(self.config['ephemeral_volatile']) @ephemeral_volatile.setter - def ephemeral_volatile(self, value): + def ephemeral_volatile(self, value: object) -> None: """Set ephemeral_volatile property""" self.app.qubesd_call( 'dom0', @@ -423,7 +448,7 @@ def ephemeral_volatile(self, value): self._config = None @property - def volumes(self): + def volumes(self) -> Generator[Volume]: """ Volumes managed by this pool """ try: volumes_data = self.app.qubesd_call( diff --git a/qubesadmin/tags.py b/qubesadmin/tags.py index 128adc00..4961a473 100644 --- a/qubesadmin/tags.py +++ b/qubesadmin/tags.py @@ -19,6 +19,12 @@ # with this program; if not, see . '''VM tags interface''' +from __future__ import annotations +from collections.abc import Iterator + +import typing +if typing.TYPE_CHECKING: + from qubesadmin.vm import QubesVM class Tags: @@ -29,37 +35,37 @@ class Tags: `-`. ''' - def __init__(self, vm): + def __init__(self, vm: QubesVM): super().__init__() self.vm = vm - def remove(self, elem): + def remove(self, elem: str) -> None: '''Remove a tag''' self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Remove', elem) - def add(self, elem): + def add(self, elem: str) -> None: '''Add a tag''' self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Set', elem) - def update(self, *others): + def update(self, *others) -> None: '''Add tags from iterable(s)''' for other in others: for elem in other: self.add(elem) - def discard(self, elem): + def discard(self, elem: str) -> None: '''Remove a tag if present''' try: self.remove(elem) except KeyError: pass - def __iter__(self): + def __iter__(self) -> Iterator[str]: qubesd_response = self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.List') return iter(qubesd_response.decode('utf-8').splitlines()) - def __contains__(self, elem): + def __contains__(self, elem: str) -> bool: '''Does the VM have a tag''' response = self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Get', elem) return response == b'1' diff --git a/qubesadmin/utils.py b/qubesadmin/utils.py index 9f6d5982..a8131d54 100644 --- a/qubesadmin/utils.py +++ b/qubesadmin/utils.py @@ -24,15 +24,23 @@ """Various utility functions.""" +from __future__ import annotations + import fcntl import os import re +import typing +from collections.abc import Iterable import qubesadmin.exc from qubesadmin.exc import QubesValueError +if typing.TYPE_CHECKING: + from qubesadmin.app import QubesBase + from qubesadmin.vm import QubesVM + -def parse_size(size): +def parse_size(size: str) -> int: """Parse human readable size into bytes.""" units = [ ('K', 1000), ('KB', 1000), @@ -55,28 +63,28 @@ def parse_size(size): raise qubesadmin.exc.QubesException("Invalid size: {0}.".format(size)) -def mbytes_to_kmg(size): +def mbytes_to_kmg(size: float | int) -> str: """Convert mbytes to human readable format.""" if size > 1024: return "%d GiB" % (size / 1024) return "%d MiB" % size -def kbytes_to_kmg(size): +def kbytes_to_kmg(size: float | int) -> str: """Convert kbytes to human readable format.""" if size > 1024: return mbytes_to_kmg(size / 1024) return "%d KiB" % size -def bytes_to_kmg(size): +def bytes_to_kmg(size: int) -> str: """Convert bytes to human readable format.""" if size > 1024: return kbytes_to_kmg(size / 1024) return "%d B" % size -def size_to_human(size): +def size_to_human(size: int) -> str: """Humane readable size, with 1/10 precision""" if size < 1024: return str(size) @@ -87,26 +95,11 @@ def size_to_human(size): return str(round(size / (1024.0 * 1024 * 1024), 1)) + ' GiB' -def get_entry_point_one(group, name): - """Get a single entry point of given type, - raise TypeError when there are multiple. - """ - import importlib.metadata - epoints = tuple(importlib.metadata.entry_points(group=group, name=name)) - if not epoints: - raise KeyError(name) - if len(epoints) > 1: - raise TypeError('more than 1 implementation of {!r} found: {}'.format( - name, ', '.join('{}.{}'.format(ep.module_name, '.'.join(ep.attrs)) - for ep in epoints))) - return epoints[0].load() - - UPDATES_DEFAULT_VM_DISABLE_FLAG = \ '/var/lib/qubes/updates/vm-default-disable-updates' -def updates_vms_status(qvm_collection): +def updates_vms_status(qvm_collection: QubesBase) -> bool | None: """Check whether all VMs have the same check-updates value; if yes, return it; otherwise, return None """ @@ -122,7 +115,8 @@ def updates_vms_status(qvm_collection): return status -def vm_dependencies(app, reference_vm): +def vm_dependencies(app: QubesBase, reference_vm: QubesVM)\ + -> list[tuple[QubesVM | None, str]]: """Helper function that returns a list of all the places a given VM is used in. Output is a list of tuples (property_holder, property_name), with None as property_holder for global properties @@ -169,12 +163,12 @@ def vm_dependencies(app, reference_vm): return result -def encode_for_vmexec(args): +def encode_for_vmexec(args: Iterable[str]) -> str: """ Encode an argument list for qubes.VMExec call. """ - def encode(part): + def encode(part: re.Match) -> bytes: if part.group(0) == b'-': return b'--' return '-{:02X}'.format(ord(part.group(0))).encode('ascii') @@ -188,26 +182,28 @@ def encode(part): class LockFile: """Simple locking context manager. It opens a file with an advisory lock taken (fcntl.lockf)""" - def __init__(self, path, nonblock=False): + def __init__(self, path: str, nonblock: bool=False): """Open the file. Call *acquire* or enter the context to lock the file""" # pylint: disable=consider-using-with self.file = open(path, "w", encoding='ascii') self.nonblock = nonblock - def __enter__(self, *args, **kwargs): + def __enter__(self, *args, **kwargs) -> LockFile: self.acquire() return self - def acquire(self): + def acquire(self) -> None: """Lock the opened file""" fcntl.lockf(self.file, fcntl.LOCK_EX | (fcntl.LOCK_NB if self.nonblock else 0)) - def __exit__(self, exc_type=None, exc_value=None, traceback=None): + def __exit__(self, exc_type: object | None = None, + exc_value: object | None = None, + traceback: object | None = None) -> None: self.release() - def release(self): + def release(self) -> None: """Unlock the file and close the file object""" fcntl.lockf(self.file, fcntl.LOCK_UN) self.file.close()