diff --git a/qubesadmin/tests/tools/qvm_ls.py b/qubesadmin/tests/tools/qvm_ls.py index 23a1d6f0..6a837794 100644 --- a/qubesadmin/tests/tools/qvm_ls.py +++ b/qubesadmin/tests/tools/qvm_ls.py @@ -21,8 +21,6 @@ # pylint: disable=missing-docstring -import unittest - import qubesadmin import qubesadmin.vm import qubesadmin.tools.qvm_ls @@ -44,34 +42,13 @@ def __init__(self): class TC_00_Column(qubesadmin.tests.QubesTestCase): def test_100_init(self): + '''Column registers itself in Column.columns on init.''' try: - testcolumn = qubesadmin.tools.qvm_ls.Column('TESTCOLUMN') - self.assertEqual(testcolumn.ls_head, 'TESTCOLUMN') + testcolumn = qubesadmin.tools.qvm_ls.Column( + 'TESTCOLUMN', attr=lambda vm: None) + self.assertEqual(testcolumn.head, 'TESTCOLUMN') finally: - try: - qubesadmin.tools.qvm_ls.Column.columns['TESTCOLUMN'] - except KeyError: - pass - - -class TC_10_globals(qubesadmin.tests.QubesTestCase): - def test_100_simple_flag(self): - flag = qubesadmin.tools.qvm_ls.simple_flag(1, 'T', 'internal') - - # TODO after serious testing of QubesVM and Qubes app, this should be - # using normal components - vm = TestVM('test-vm', internal=False) - - self.assertFalse(flag(None, vm)) - vm.internal = True - self.assertTrue(flag(None, vm)) - - @unittest.skip('column list generated dynamically') - def test_900_formats_columns(self): - for cols in qubesadmin.tools.qvm_ls.formats.values(): - for col in cols: - self.assertIn(col.upper(), - qubesadmin.tools.qvm_ls.Column.columns) + del qubesadmin.tools.qvm_ls.Column.columns['TESTCOLUMN'] class TC_50_List(qubesadmin.tests.QubesTestCase): @@ -228,6 +205,37 @@ def test_110_network_tree(self): ' └─test-vm-3 Running TestVM - - test-vm-proxy\n' '└─test-vm-4 Running TestVM - - test-vm-net-2\n') + def test_105_flags(self): + '''FLAGS column encodes type, power state and boolean attributes.''' + app = TestApp() + vm = app.domains['test-vm'] + vm.klass = 'AppVM' + vm.updateable = True + vm.provides_network = False + vm.installed_by_rpm = False + vm.internal = False + vm.debug = True + vm.autostart = False + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + qubesadmin.tools.qvm_ls.main(['--fields', 'name,flags', + 'test-vm'], app=app) + # arU---D-: AppVM, running, updateable, debug + self.assertEqual(stdout.getvalue(), + 'NAME FLAGS\n' + 'test-vm arU---D-\n') + + def test_106_raw_data(self): + '''--raw-data produces pipe-separated values with no header.''' + app = TestApp() + app.domains['test-vm'].klass = 'AppVM' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + qubesadmin.tools.qvm_ls.main(['--raw-data', '--fields=name,class', + 'dom0', 'test-vm'], app=app) + self.assertEqual(stdout.getvalue(), + 'dom0|TestVM\n' + 'test-vm|AppVM\n') + + class TC_70_Tags(qubesadmin.tests.QubesTestCase): def setUp(self): self.app = TestApp() @@ -427,6 +435,7 @@ def setUp(self): ) def test_101_sort_string(self): + '''--sort with --reverse and --ignore-case sorts case-insensitively.''' with qubesadmin.tests.tools.StdoutBuffer() as stdout: qubesadmin.tools.qvm_ls.main( ['--sort', 'NAME', '--reverse', '--ignore-case'], app=self.app) @@ -438,6 +447,7 @@ def test_101_sort_string(self): 'a Running TestVM red - -\n') def test_102_sort_numeric(self): + '''Numeric columns are sorted by value, not lexicographically.''' with qubesadmin.tests.tools.StdoutBuffer() as stdout: qubesadmin.tools.qvm_ls.main( ['--field', 'NAME,MAXMEM', '--sort', 'MAXMEM'], app=self.app) @@ -448,6 +458,18 @@ def test_102_sort_numeric(self): 'c 300\n' 'B 1000\n') + def test_103_sort_column_not_in_output(self): + '''--sort on a column absent from output leaves order unchanged.''' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + qubesadmin.tools.qvm_ls.main( + ['--fields', 'NAME,CLASS', '--sort', 'STATE'], app=self.app) + self.assertEqual(stdout.getvalue(), + 'NAME CLASS\n' + 'B TestVM\n' + 'a TestVM\n' + 'c TestVM\n' + 'dom0 TestVM\n') + class TC_110_Filtering(qubesadmin.tests.QubesTestCase): def test_111_filter_class(self): diff --git a/qubesadmin/tools/qvm_ls.py b/qubesadmin/tools/qvm_ls.py index 3a9ae3dd..5134c8a8 100644 --- a/qubesadmin/tools/qvm_ls.py +++ b/qubesadmin/tools/qvm_ls.py @@ -30,6 +30,7 @@ import collections.abc import sys import textwrap +from collections.abc import Callable import qubesadmin import qubesadmin.spinner @@ -37,68 +38,42 @@ import qubesadmin.utils import qubesadmin.vm import qubesadmin.exc - -# -# columns -# +from qubesadmin.vm import QubesVM class Column: - '''A column in qvm-ls output characterised by its head and a way - to fetch a parameter describing the domain. + """A column in qvm-ls output. :param str head: Column head (usually uppercase). - :param str attr: Attribute, possibly complex (containing ``.``). This may \ - also be a callable that gets as its only argument the domain. + :param attr: Attribute path (dotted string) or callable ``(vm) -> value``. :param str doc: Description of column (will be visible in --help-columns). - ''' + """ #: collection of all columns columns = {} - def __init__(self, head, attr=None, doc=None): - self.ls_head = head + def __init__(self, head: str, + attr: str | Callable[[QubesVM], object], + doc: str | None=None): + self.head = head self.__doc__ = doc - - # intentionally not always do set self._attr, - # to cause AttributeError in self.format() - if attr is not None: - self._attr = attr - - self.__class__.columns[self.ls_head] = self - + self._attr = attr + self.__class__.columns[self.head] = self def cell(self, vm, insertion=0): - '''Format one cell. - - .. note:: - - This is only for technical formatting (filling with space). If you - want to subclass the :py:class:`Column` class, you should override - :py:meth:`Column.format` method instead. + '''Format one cell, handling tree indentation for the NAME column. - :param qubes.vm.qubesvm.QubesVM: Domain to get a value from. - :param int insertion: Intending to shift the value to the right. + :param vm: Domain to get a value from. + :param insertion: Tree depth; shifts NAME value to the right. :returns: string to display - :rtype: str ''' - value = self.format(vm) or '-' - if insertion > 0 and self.ls_head == 'NAME': + if insertion > 0 and self.head == 'NAME': value = '└─' + value value = ' ' * (insertion-1) + value return value - - def format(self, vm): - '''Format one cell value. - - Return value to put in a table cell. - - :param qubes.vm.qubesvm.QubesVM: Domain to get a value from. - :returns: Value to put, or :py:obj:`None` if no value. - :rtype: str or None - ''' - + def format(self, vm: QubesVM) -> str | None: + '''Return the cell value for *vm*, or ``None`` if not applicable.''' ret = None try: if isinstance(self._attr, str): @@ -120,189 +95,79 @@ def format(self, vm): return str(ret) def __repr__(self): - return '{}(head={!r})'.format(self.__class__.__name__, - self.ls_head) - - - def __eq__(self, other): - return self.ls_head == other.ls_head + return '{}(head={!r})'.format(self.__class__.__name__, self.head) + def __eq__(self, other: object) -> bool: + if isinstance(other, Column): + return self.head == other.head + return NotImplemented - def __lt__(self, other): - return self.ls_head < other.ls_head + def __lt__(self, other: object) -> bool: + if isinstance(other, Column): + return self.head < other.head + return NotImplemented class PropertyColumn(Column): - '''Column that displays value from property (:py:class:`property` or - :py:class:`qubes.property`) of domain. + """Column that displays a VM property by name. :param name: Name of VM property. - ''' + """ def __init__(self, name): - ls_head = name.replace('_', '-').upper() - super().__init__(head=ls_head, attr=name) - - def __repr__(self): - return '{}(head={!r}'.format( - self.__class__.__name__, - self.ls_head) - - -def process_vm(vm): - '''Process VM object to find all listable properties. - - :param qubesmgmt.vm.QubesVM vm: VM object. + super().__init__(head=name.replace('_', '-').upper(), attr=name) + + def __repr__(self) -> str: + return '{}(head={!r}'.format(self.__class__.__name__, self.head) + + +def _format_flags(vm: QubesVM) -> str: + '''Format FLAGS column value for a single VM. + + Each character position encodes one property: + 1 type: 0=AdminVM, a/A=AppVM, d/D=DispVM, s/S=StandaloneVM, + t/T=TemplateVM + (uppercase = HVM) + 2 power state: r=running, t=transient, p=paused, s=suspended, + h=halting, d=dying, c=crashed, ?=unknown + 3 U updateable + 4 N provides_network + 5 R installed_by_rpm + 6 i internal + 7 D debug + 8 A autostart ''' + type_codes = { + 'AdminVM': '0', 'TemplateVM': 't', 'AppVM': 'a', + 'StandaloneVM': 's', 'DispVM': 'd', + } + type_letter = type_codes.get(vm.klass, '-') + if type_letter not in ('0', '-'): + if getattr(vm, 'virt_mode', 'pv') == 'hvm': + type_letter = type_letter.upper() + + state = vm.get_power_state().lower() + if state == 'unknown': + power_letter = '?' + elif state in ('running', 'transient', 'paused', 'suspended', + 'halting', 'dying', 'crashed'): + power_letter = state[0] + else: + power_letter = '-' - for prop_name in vm.property_list(): - PropertyColumn(prop_name) - - -def flag(field): - '''Mark method as flag field. - - :param int field: Which field to fill (counted from 1) - ''' - - def decorator(obj): - # pylint: disable=missing-docstring - obj.field = field - return obj - return decorator - - -def simple_flag(field, letter, attr, doc=None): - '''Create simple, binary flag. - - :param str attr: Attribute name to check. If result is true, flag is fired. - :param str letter: The letter to show. - ''' - - def helper(self, vm): - # pylint: disable=missing-docstring,unused-argument - try: - value = getattr(vm, attr) - except AttributeError: - value = False - - if value: - return letter[0] - - helper.__doc__ = doc - helper.field = field - return helper - - -class FlagsColumn(Column): - '''Some fancy flags that describe general status of the domain.''' - - def __init__(self): - super().__init__(head='FLAGS', doc=self.__class__.__doc__) - - - @flag(1) - def type(self, vm): - '''Type of domain. - - 0 AdminVM (AKA Dom0) - aA AppVM - dD DisposableVM - sS StandaloneVM - tT TemplateVM - - When it is HVM (optimised VM), the letter is capital. - ''' - - type_codes = { - 'AdminVM': '0', - 'TemplateVM': 't', - 'AppVM': 'a', - 'StandaloneVM': 's', - 'DispVM': 'd', - } - ret = type_codes.get(vm.klass, None) - if ret == '0': - return ret - - if ret is not None: - if getattr(vm, 'virt_mode', 'pv') == 'hvm': - return ret.upper() - return ret - - - @flag(2) - def power(self, vm): - '''Current power state. - - r running - t transient - p paused - s suspended - h halting - d dying - c crashed - ? unknown - ''' - - state = vm.get_power_state().lower() - if state == 'unknown': - return '?' - if state in ('running', 'transient', 'paused', 'suspended', - 'halting', 'dying', 'crashed'): - return state[0] - - - updateable = simple_flag(3, 'U', 'updateable', - doc='If the domain is updateable.') - - provides_network = simple_flag(4, 'N', 'provides_network', - doc='If the domain provides network.') - - installed_by_rpm = simple_flag(5, 'R', 'installed_by_rpm', - doc='If the domain is installed by RPM.') - - internal = simple_flag(6, 'i', 'internal', - doc='If the domain is internal (not normally shown, no appmenus).') - - debug = simple_flag(7, 'D', 'debug', - doc='If the domain is being debugged.') - - autostart = simple_flag(8, 'A', 'autostart', - doc='If the domain is marked for autostart.') - - # TODO (not sure if really): - # include in backups - # uses_custom_config - - def _no_flag(self, vm): - '''Reserved for future use.''' - - - @classmethod - def get_flags(cls): - '''Get all flags as list. - - Holes between flags are filled with :py:meth:`_no_flag`. - - :rtype: list - ''' - - flags = {} - for mycls in cls.__mro__: - for attr in mycls.__dict__.values(): - if not hasattr(attr, 'field'): - continue - if attr.field in flags: - continue - flags[attr.field] = attr - - return [(flags[i] if i in flags else cls._no_flag) - for i in range(1, max(flags) + 1)] - + def bool_flag(attr: str, letter: str) -> str: + return letter if getattr(vm, attr, False) else '-' - def format(self, vm): - return ''.join((flag(self, vm) or '-') for flag in self.get_flags()) + return ''.join([ + type_letter, + power_letter, + bool_flag('updateable', 'U'), + bool_flag('provides_network', 'N'), + bool_flag('installed_by_rpm', 'R'), + bool_flag('internal', 'i'), + bool_flag('debug', 'D'), + bool_flag('autostart', 'A'), + ]) def calc_size(vm, volume_name): @@ -388,11 +253,13 @@ def calc_used(vm, volume_name): doc='Disk utilisation by root image as a percentage of available space.') -FlagsColumn() +Column('FLAGS', attr=_format_flags, + doc='Various flags: type, power state, updateable, provides_network, ' + 'installed_by_rpm, internal, debug, autostart.') # Sorting columns based on numeric or string (default) values -SORT_NUMERIC = ['MEMORY', 'DISK', 'PRIV-CURR', 'PRIV-MAX', 'ROOT-CURR', 'XID', \ - 'ROOT-MAX', 'MAXMEM', 'QREXEC-TIMEOUT', 'SHUTDOWN-TIMEOUT', \ +SORT_NUMERIC = ['MEMORY', 'DISK', 'PRIV-CURR', 'PRIV-MAX', 'ROOT-CURR', 'XID', + 'ROOT-MAX', 'MAXMEM', 'QREXEC-TIMEOUT', 'SHUTDOWN-TIMEOUT', 'VCPUS', 'PRIV-USED', 'ROOT-USED'] class Table: @@ -416,13 +283,13 @@ def __init__(self, domains, colnames, spinner, *, raw_data=False, def get_head(self): '''Get table head data (all column heads).''' - return [col.ls_head for col in self.columns] + return [col.head for col in self.columns] def get_row(self, vm, insertion=0): '''Get single table row data (all columns for one domain).''' ret = [] for col in self.columns: - if self.tree_sorted and col.ls_head == 'NAME': + if self.tree_sorted and col.head == 'NAME': ret.append(col.cell(vm, insertion)) else: ret.append(col.cell(vm)) @@ -558,13 +425,13 @@ def __init__(self, help=help) def __call__(self, parser, namespace, values, option_string=None): - width = max(len(column.ls_head) for column in Column.columns.values()) + width = max(len(column.head) for column in Column.columns.values()) wrapper = textwrap.TextWrapper(width=80, initial_indent=' ', subsequent_indent=' ' * (width + 6)) text = 'Available columns:\n' + '\n'.join( wrapper.fill('{head:{width}s} {doc}'.format( - head=column.ls_head, + head=column.head, doc=column.__doc__ or '', width=width)) for column in sorted(Column.columns.values()))