Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ https://www.qubes-os.org/doc/admin-api/ for protocol specification.
Compatibility
=============

This package requires Python >= 3.5
This package requires Python >= 3.11
1 change: 0 additions & 1 deletion qubesadmin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
Expand Down
31 changes: 15 additions & 16 deletions qubesadmin/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- encoding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
Expand Down Expand Up @@ -256,8 +255,8 @@ def add_pool(self, name, driver, **kwargs):
see :py:meth:`pool_driver_parameters` for a list
"""
# sort parameters only to ease testing, not required by API
payload = "name={}\n".format(name) + "".join(
"{}={}\n".format(key, value)
payload = f"name={name}\n" + "".join(
f"{key}={value}\n"
for key, value in sorted(kwargs.items())
)
self.qubesd_call(
Expand Down Expand Up @@ -356,13 +355,13 @@ def add_new_vm(
raise ValueError("only one of pool= and pools= can be used")

method_prefix = "admin.vm.Create."
payload = "name={} label={}".format(name, label)
payload = f"name={name} label={label}"
if pool:
payload += " pool={}".format(str(pool))
payload += f" pool={str(pool)}"
method_prefix = "admin.vm.CreateInPool."
if pools:
payload += "".join(
" pool:{}={}".format(vol, str(pool))
f" pool:{vol}={str(pool)}"
for vol, pool in sorted(pools.items())
)
method_prefix = "admin.vm.CreateInPool."
Expand Down Expand Up @@ -445,13 +444,13 @@ def clone_vm(
pools[volume.name] = volume.pool

method_prefix = "admin.vm.Create."
payload = "name={} label={}".format(new_name, label)
payload = f"name={new_name} label={label}"
if pool:
payload += " pool={}".format(str(pool))
payload += f" pool={str(pool)}"
method_prefix = "admin.vm.CreateInPool."
if pools:
payload += "".join(
" pool:{}={}".format(vol, str(pool))
f" pool:{vol}={str(pool)}"
for vol, pool in sorted(pools.items())
)
method_prefix = "admin.vm.CreateInPool."
Expand Down Expand Up @@ -483,7 +482,7 @@ def clone_vm(
pass
except qubesadmin.exc.QubesException as e:
dst_vm.log.error(
"Failed to set {!s} property: {!s}".format(prop, e)
f"Failed to set {prop!s} property: {e!s}"
)
if not ignore_errors:
raise
Expand All @@ -495,7 +494,7 @@ def clone_vm(
dst_vm.tags.add(tag)
except qubesadmin.exc.QubesException as e:
dst_vm.log.error(
"Failed to add {!s} tag: {!s}".format(tag, e)
f"Failed to add {tag!s} tag: {e!s}"
)
if not ignore_errors:
raise
Expand All @@ -510,7 +509,7 @@ def clone_vm(
dst_vm.features[feature] = value
except qubesadmin.exc.QubesException as e:
dst_vm.log.error(
"Failed to set {!s} feature: {!s}".format(feature, e)
f"Failed to set {feature!s} feature: {e!s}"
)
if not ignore_errors:
raise
Expand All @@ -521,7 +520,7 @@ def clone_vm(
dst_vm.set_notes(vm_notes)
except qubesadmin.exc.QubesException as e:
dst_vm.log.error(
'Failed to clone qube notes: {!s}'.format(e))
f'Failed to clone qube notes: {e!s}')
if not ignore_errors:
raise

Expand Down Expand Up @@ -596,7 +595,7 @@ def clone_vm(
if ignore_volumes and dst_volume.name in ignore_volumes:
continue
src_volume = src_vm.volumes[dst_volume.name]
dst_vm.log.info("Cloning {} volume".format(dst_volume.name))
dst_vm.log.info(f"Cloning {dst_volume.name} volume")
dst_volume.clone(src_volume)

except qubesadmin.exc.QubesException:
Expand Down Expand Up @@ -840,7 +839,7 @@ def qubesd_call(
)
if not os.path.exists(method_path):
raise qubesadmin.exc.QubesDaemonCommunicationError(
"{} not found".format(method_path)
f"{method_path} not found"
)
command = [
"env",
Expand All @@ -859,7 +858,7 @@ def qubesd_call(
try:
client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client_socket.connect(qubesadmin.config.QUBESD_SOCKET)
except (IOError, OSError) as e:
except OSError as e:
raise qubesadmin.exc.QubesDaemonCommunicationError(
"Failed to connect to qubesd service: %s", str(e)
)
Expand Down
1 change: 0 additions & 1 deletion qubesadmin/backup/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
Expand Down
8 changes: 3 additions & 5 deletions qubesadmin/backup/core2.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
Expand Down Expand Up @@ -229,7 +228,7 @@ def set_netvm_dependency(self, element: _Element) -> None:

if dispvm_netvm != self.globals['default_netvm']:
if dispvm_netvm:
dispvm_tpl_name = 'disp-{}'.format(dispvm_netvm)
dispvm_tpl_name = f'disp-{dispvm_netvm}'
else:
dispvm_tpl_name = 'disp-no-netvm'

Expand Down Expand Up @@ -303,7 +302,7 @@ def import_core2_vm(self, element: _Element) -> None:
if value and value.lower() == "none":
value = None
value_is_default = element.get(
"uses_default_{}".format(attr))
f"uses_default_{attr}")
if value_is_default and value_is_default.lower() != \
"true":
vm.properties[attr] = value
Expand Down Expand Up @@ -354,8 +353,7 @@ def load(self) -> bool | None:
try:
# pylint: disable=no-member
tree = lxml.etree.parse(fh)
except (EnvironmentError, # pylint: disable=broad-except
xml.parsers.expat.ExpatError) as err:
except (OSError, xml.parsers.expat.ExpatError) as err:
self.log.error(err)
return False

Expand Down
6 changes: 2 additions & 4 deletions qubesadmin/backup/core3.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
Expand Down Expand Up @@ -80,7 +79,7 @@ def get_property(xml_obj: _Element, prop: str) -> str | None:
Object can be any PropertyHolder serialized to XML - in practice
:py:class:`BaseVM` or :py:class:`Qubes`.
'''
xml_prop = xml_obj.findall('./property[@name=\'{}\']'.format(prop))
xml_prop = xml_obj.findall(f'./property[@name=\'{prop}\']')
if not xml_prop:
raise KeyError(prop)
return xml_prop[0].text
Expand Down Expand Up @@ -165,8 +164,7 @@ def load(self) -> bool | None:
try:
# pylint: disable=no-member
tree = lxml.etree.parse(fh)
except (EnvironmentError, # pylint: disable=broad-except
xml.parsers.expat.ExpatError) as err:
except (OSError, xml.parsers.expat.ExpatError) as err:
self.log.error(err)
return False

Expand Down
4 changes: 2 additions & 2 deletions qubesadmin/backup/dispvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def run(self) -> bytes | None:
)
if exit_code != 0:
raise qubesadmin.exc.BackupRestoreError(
'qvm-backup-restore failed with {}'.format(exit_code),
f'qvm-backup-restore failed with {exit_code}',
backup_log=backup_log)
return backup_log
except subprocess.CalledProcessError as e:
Expand All @@ -350,7 +350,7 @@ def run(self) -> bytes | None:
except: # pylint: disable=bare-except
backup_log = None
raise qubesadmin.exc.BackupRestoreError(
'qvm-backup-restore failed with {}'.format(e.returncode),
f'qvm-backup-restore failed with {e.returncode}',
backup_log=backup_log)
finally:
if self.dispvm is not None:
Expand Down
32 changes: 16 additions & 16 deletions qubesadmin/backup/restore.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
Expand Down Expand Up @@ -47,7 +46,8 @@

import collections
from subprocess import Popen
from typing import Callable, TypeVar, Iterable, IO, Generator
from typing import TypeVar, IO
from collections.abc import Callable, Iterable, Generator

import qubesadmin
import qubesadmin.vm
Expand Down Expand Up @@ -208,7 +208,7 @@ def load(self, untrusted_header_text: bytes) -> None:
continue
header = self.known_headers[key]
if key in seen:
raise QubesException("Duplicated header line: {}".format(key))
raise QubesException(f"Duplicated header line: {key}")
seen.add(key)
if getattr(self, header.field, None) is not None:
# ignore options already set (potentially forced values)
Expand All @@ -234,7 +234,7 @@ def load(self, untrusted_header_text: bytes) -> None:
"Unusual compression filter '{f}' found. Use "
"--compression-filter={f} to use it anyway.".format(
f=value))
raise QubesException("Invalid value for header: {}".format(key))
raise QubesException(f"Invalid value for header: {key}")
setattr(self, header.field, value)

self.validate()
Expand All @@ -256,23 +256,23 @@ def validate(self) -> None:
for key in expected_attrs:
if getattr(self, key) is None:
raise QubesException(
"Backup header lack '{}' info".format(key))
f"Backup header lack '{key}' info")
else:
raise QubesException(
"Unsupported backup version {}".format(self.version))
f"Unsupported backup version {self.version}")

def save(self, filename: str) -> None:
'''Save backup header into a file'''
with open(filename, "w", encoding='utf-8') as f_header:
# make sure 'version' is the first key
f_header.write('version={}\n'.format(self.version))
f_header.write(f'version={self.version}\n')
for key, header in self.known_headers.items():
if key == 'version':
continue
attr = header.field
if getattr(self, attr) is None:
continue
f_header.write("{!s}={!s}\n".format(key, getattr(self, attr)))
f_header.write(f"{key!s}={getattr(self, attr)!s}\n")

def launch_proc_with_pty(args: list[str], stdin: int | None=None,
stdout: int | None=None, stderr: int | None=None,
Expand Down Expand Up @@ -327,7 +327,7 @@ def launch_scrypt(action: str, input_name: str, output_name: str,
actual_prompt = typing.cast(IO, p.stderr).read(len(prompt))
if actual_prompt != prompt:
raise QubesException(
'Unexpected prompt from scrypt: {}'.format(actual_prompt))
f'Unexpected prompt from scrypt: {actual_prompt}')
pty.write(passphrase.encode('utf-8') + b'\n')
pty.flush()
# save it here, so garbage collector would not close it (which would kill
Expand Down Expand Up @@ -451,7 +451,7 @@ def collect_tar_output(self) -> None:
try:
new_lines = self.tar2_process.stderr \
.read(MAX_STDERR_BYTES).splitlines()
except IOError as e:
except OSError as e:
if e.errno == errno.EAGAIN:
return
raise
Expand Down Expand Up @@ -1128,7 +1128,7 @@ def load_hmac(hmac_text: str) -> str:
'''
if any(ord(x) not in range(128) for x in hmac_text):
raise QubesException(
"Invalid content of {}".format(hmacfile))
f"Invalid content of {hmacfile}")
hmac_text_list = hmac_text.strip().split("=")
if len(hmac_text_list) > 1:
hmac_text = hmac_text_list[1].strip()
Expand Down Expand Up @@ -1161,7 +1161,7 @@ def load_hmac(hmac_text: str) -> str:
with open(f_name + '.dec', 'rb') as f_two:
if f_one.read() != f_two.read():
raise QubesException(
'Invalid hmac on {}'.format(filename))
f'Invalid hmac on {filename}')
return True

with open(os.path.join(self.tmpdir, filename), 'rb') as f_input:
Expand All @@ -1174,10 +1174,10 @@ def load_hmac(hmac_text: str) -> str:

if hmac_stderr:
raise QubesException(
"ERROR: verify file {0}: {1}".format(filename, hmac_stderr))
f"ERROR: verify file {filename}: {hmac_stderr}")
self.log.debug("Loading hmac for file %s", filename)
try:
with open(os.path.join(self.tmpdir, hmacfile), 'r',
with open(os.path.join(self.tmpdir, hmacfile),
encoding='ascii') as f_hmac:
hmac = load_hmac(f_hmac.read())
except UnicodeDecodeError as err:
Expand Down Expand Up @@ -1627,7 +1627,7 @@ def new_name_for_conflicting_vm(self, orig_name: str,
while (new_name in restore_info.keys() or
new_name in [x.name for x in restore_info.values()] or
new_name in self.app.domains):
new_name = str('{}-{}'.format(orig_name, number))
new_name = str(f'{orig_name}-{number}')
number += 1
if number == 100:
# give up
Expand Down Expand Up @@ -1902,7 +1902,7 @@ def _handle_dom0(self, stream: BytesIO) -> None:
except KeyError:
home_dir = os.path.expanduser('~')
local_user = getpass.getuser()
restore_home_backupdir = "home-restore-{0}".format(
restore_home_backupdir = "home-restore-{}".format(
time.strftime("%Y-%m-%d-%H%M%S"))

self.log.info("Restoring home of user '%s' to '%s' directory...",
Expand Down
5 changes: 2 additions & 3 deletions qubesadmin/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
Expand Down Expand Up @@ -281,7 +280,7 @@ def _parse_type_value(self, prop_type, value):
prop_type = prop_type.decode('ascii')
if not prop_type.startswith('type='):
raise qubesadmin.exc.QubesDaemonCommunicationError(
'Invalid type prefix received: {}'.format(prop_type))
f'Invalid type prefix received: {prop_type}')
(_, prop_type) = prop_type.split('=', 1)
value = value.decode()
if prop_type == 'str':
Expand All @@ -303,7 +302,7 @@ def _parse_type_value(self, prop_type, value):
return None
return self.app.labels.get_blind(value)
raise qubesadmin.exc.QubesDaemonCommunicationError(
'Received invalid value type: {}'.format(prop_type))
f'Received invalid value type: {prop_type}')

def _fetch_all_properties(self):
"""
Expand Down
1 change: 0 additions & 1 deletion qubesadmin/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
Expand Down
Loading