Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions sssd_test_framework/misc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,26 @@ def attrs_parse(lines: list[str], attrs: list[str] | None = None) -> dict[str, l
return out


def delimiter_parse(lines: list[str], delimiter: str = ":") -> dict[str, str]:
"""
Parse delimited lines from output.

:param lines: Output.
:type lines: list[str]
:param delimiter: Delimiter, optional
:type delimiter: str, defaults to :
:return: Dictionary with first element as the name as a key.
:rtype: dict[str, str]
"""
out: dict[str, str] = {}

for item in lines:
key, value = item.split(delimiter, 1)
out[key.strip()] = value.strip()

return out


def attrs_include_value(attr: Any | list[Any] | None, value: Any) -> list[Any]:
"""
Include ``value`` to attribute list if it is not yet present.
Expand Down
59 changes: 59 additions & 0 deletions sssd_test_framework/roles/ipa.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ..misc import (
attrs_include_value,
attrs_parse,
delimiter_parse,
get_attr,
ip_version,
to_list,
Expand Down Expand Up @@ -1192,6 +1193,20 @@ def test_example(client: Client, ipa: IPA):
"""
return IDUserOverride(self)

def subid(self) -> IPASubID:
"""
IPA subid management.

.. code-block:: python
:caption: Example usage

@pytest.mark.topology(KnownTopology.IPA)
def test_ipa__subids_configured(ipa: IPA):
user = ipa.user("user1").add()
user.subid().generate()
"""
return IPASubID(self.role, self.name)


class IDUserOverride(IPAUser):
"""
Expand Down Expand Up @@ -1400,6 +1415,50 @@ def show_override(self, idview_name: str) -> dict[str, list[str]]:
return attrs_parse(lines)


class IPASubID(BaseObject[IPAHost, IPA]):
"""
IPA sub id management.
"""

def __init__(self, role: IPA, user: str) -> None:
"""
:param user: Username.
:type user: str
"""
super().__init__(role)

self.name = user
""" Owner name."""

self.uid_start: int | None = None
""" SubUID range start"""

self.uid_size: int | None = None
""" SubUID range size."""

self.gid_start: int | None = None
""" SubGID range start."""

self.gid_size: int | None = None
""" SubGID range size."""

def generate(self) -> IPASubID:
"""
Generate subordinate id.
"""
self.host.conn.run(f"ipa subid-generate --owner {self.name}")
result = self.host.conn.run(f"ipa subid-find --owner {self.name}").stdout_lines
result = [item for item in result if ":" in item]
subids = delimiter_parse(result)

self.uid_start = int(subids["SubUID range start"]) if subids.get("SubUID range start") else None
self.uid_size = int(subids["SubUID range size"]) if subids.get("SubUID range size") else None
self.gid_start = int(subids["SubGID range start"]) if subids.get("SubGID range start") else None
self.gid_size = int(subids["SubGID range size"]) if subids.get("SubGID range size") else None

return self


class IPAGroup(IPAObject):
"""
IPA group management.
Expand Down
6 changes: 6 additions & 0 deletions sssd_test_framework/utils/sssd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1081,3 +1081,9 @@ def socket_responders(self, responders: list[str] | None = None) -> None:
for responder in responders:
socket_unit = f"sssd-{responder}.socket"
self.sssd.svc.start(socket_unit)

def subid(self) -> None:
"""
Configure SSSD for subid.
"""
self.sssd.authselect.select("sssd", ["with-subid"])
67 changes: 67 additions & 0 deletions sssd_test_framework/utils/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,52 @@ def FromOutput(cls, stdout: str) -> ServicesEntry:
return cls.FromDict(result)


class SubIDEntry(object):
"""
Result of 'getsubids'.
"""

def __init__(self, name: str, range_start: int, range_size: int) -> None:

self.name: str = name
""" User name"""

self.range_start: int = range_start
""" SubID range start """

self.range_size: int = range_size
""" SubID range size """

def __str__(self) -> str:
return f"owner:{self.name}, start:{self.range_start}, size:{self.range_size})"

def __repr__(self) -> str:
return str(self)

@classmethod
def FromDict(cls, d: dict[str, Any]) -> SubIDEntry:
return cls(
name=d.get("name", ""),
range_start=d.get("range_start", 0),
range_size=d.get("range_size", 0),
)

@classmethod
def FromOutput(cls, stdout: str) -> SubIDEntry:
line = stdout.strip()
if ":" in line:
parts = line.split(":", 1)
if len(parts) == 2:
rest = parts[1].strip().split()
if len(rest) == 3:
return cls(
name=rest[0],
range_start=int(rest[1]),
range_size=int(rest[2]),
)
return cls(name="", range_start=0, range_size=0)


class LinuxToolsUtils(MultihostUtility[MultihostHost]):
"""
Run various standard commands on remote host.
Expand All @@ -762,6 +808,27 @@ def __init__(self, host: MultihostHost, fs: LinuxFileSystem) -> None:
self.__fs: LinuxFileSystem = fs
self.__rollback: list[str] = []

def getsubid(self, name: str, group: bool = False) -> SubIDEntry | None:
"""
Call ``getsubids $name``

:param name: User name.
:type name: str
:param group: Get group range switch, optional
:type group: bool, defaults to False
:return: SubIDEntry data, None if not found
:type: SubIDEntry | None
"""
args = ""
if group:
args = "-g"

command = self.host.conn.run(f"getsubids {args} {name}", raise_on_error=False)
if command.rc != 0:
return None

return SubIDEntry.FromOutput(command.stdout)

def id(self, name: str | int) -> IdEntry | None:
"""
Run ``id`` command.
Expand Down
20 changes: 20 additions & 0 deletions tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
attrs_include_value,
attrs_parse,
attrs_to_hash,
delimiter_parse,
get_attr,
ip_to_ptr,
ip_version,
Expand Down Expand Up @@ -86,6 +87,25 @@ def test_attrs_parse__long_line(input, expected):
assert attrs_parse(input) == expected


@pytest.mark.parametrize(
"input,expected, delimiter",
[
(
["Unique ID: 5fe04e66-da53-4ac0-94f3-fd0cd5cefd6d", "Owner: user1"],
{"Unique ID": "5fe04e66-da53-4ac0-94f3-fd0cd5cefd6d", "Owner": "user1"},
":",
),
(
["Unique ID, 5fe04e66-da53-4ac0-94f3-fd0cd5cefd6d", "Owner, user1"],
{"Unique ID": "5fe04e66-da53-4ac0-94f3-fd0cd5cefd6d", "Owner": "user1"},
",",
),
],
)
def test_delimiter_parse(input, expected, delimiter):
assert delimiter_parse(input, delimiter=delimiter) == expected


@pytest.mark.parametrize(
"input,expected",
[
Expand Down
Loading