diff --git a/tests/pyportaltest/templates/inputcapture.py b/tests/pyportaltest/templates/inputcapture.py new file mode 100644 index 00000000..2419ec14 --- /dev/null +++ b/tests/pyportaltest/templates/inputcapture.py @@ -0,0 +1,361 @@ +# SPDX-License-Identifier: LGPL-3.0-only +# +# This file is formatted with Python Black + +"""xdg desktop portals mock template""" + +from pyportaltest.templates import Request, Response, ASVType, Session +from typing import Callable, Dict, List, Tuple, Iterator +from itertools import count + +import dbus.service +import logging +import sys + +from gi.repository import GLib + +BUS_NAME = "org.freedesktop.portal.Desktop" +MAIN_OBJ = "/org/freedesktop/portal/desktop" +SYSTEM_BUS = False +MAIN_IFACE = "org.freedesktop.portal.InputCapture" + +logger = logging.getLogger(f"templates.{__name__}") +logger.setLevel(logging.DEBUG) + +zone_serial = None +eis_serial = None + + +def load(mock, parameters=None): + + logger.debug(f"Loading parameters: {parameters}") + + # Delay before Request.response, applies to all functions + mock.delay: int = parameters.get("delay", 0) + + # EIS serial number, < 0 means "don't send a serial" + eis_serial_start = parameters.get("eis-serial", 0) + if eis_serial_start >= 0: + global eis_serial + eis_serial = count(start=eis_serial_start) + + # Zone serial number, < 0 means "don't send a serial" + zone_serial_start = parameters.get("zone-serial", 0) + if zone_serial_start >= 0: + global zone_serial + zone_serial = count(start=zone_serial_start) + mock.current_zone_serial = next(zone_serial) + else: + mock.current_zone_serial = None + + # An all-zeroes zone means "don't send a zone" + mock.current_zones = parameters.get("zones", ((1920, 1080, 0, 0),)) + if mock.current_zones[0] == (0, 0, 0, 0): + mock.current_zones = None + + # second set of zones after the change signal + mock.changed_zones = parameters.get("changed-zones", ((0, 0, 0, 0),)) + if mock.changed_zones[0] == (0, 0, 0, 0): + mock.changed_zones = None + + # milliseconds until the zones change to the changed_zones + mock.change_zones_after = parameters.get("change-zones-after", 0) + + # List of barrier ids to fail + mock.failed_barriers = parameters.get("failed-barriers", []) + + # When to send the Activated signal (in ms after Enable), 0 means no + # signal + mock.activated_after = parameters.get("activated-after", 0) + + # Barrier ID that triggers Activated (-1 means don't add barrier id) + mock.activated_barrier = parameters.get("activated-barrier", None) + + # Position tuple for Activated signal, None means don't add position + mock.activated_position = parameters.get("activated-position", None) + + # When to send the Deactivated signal (in ms after Activated), 0 means no + # signal + mock.deactivated_after = parameters.get("deactivated-after", 0) + + # Position tuple for Deactivated signal, None means don't add position + mock.deactivated_position = parameters.get("deactivated-position", None) + + # When to send the Disabled signal (in ms after Enabled), 0 means no + # signal + mock.disabled_after = parameters.get("disabled-after", 0) + + mock.AddProperties( + MAIN_IFACE, + dbus.Dictionary( + { + "version": dbus.UInt32(parameters.get("version", 1)), + "SupportedCapabilities": dbus.UInt32( + parameters.get("capabilities", 0xF) + ), + } + ), + ) + + mock.active_sessions: Dict[str, Session] = {} + + +@dbus.service.method( + MAIN_IFACE, + sender_keyword="sender", + in_signature="sa{sv}", + out_signature="o", +) +def CreateSession(self, parent_window: str, options: ASVType, sender: str): + try: + request = Request(bus_name=self.bus_name, sender=sender, options=options) + session = Session(bus_name=self.bus_name, sender=sender, options=options) + + response = Response( + 0, + { + "capabilities": dbus.UInt32(0xF, variant_level=1), + "session_handle": dbus.ObjectPath(session.handle), + }, + ) + self.active_sessions[session.handle] = session + + logger.debug(f"CreateSession with response {response}") + request.respond(response, delay=self.delay) + + return request.handle + except Exception as e: + logger.critical(e) + + +@dbus.service.method( + MAIN_IFACE, + sender_keyword="sender", + in_signature="oa{sv}", + out_signature="h", +) +def ConnectToEIS(self, session_handle: str, options: ASVType, sender: str): + try: + import socket + + sockets = socket.socketpair() + # Write some random data down so it'll break anything that actually + # expects the socket to be a real EIS socket + sockets[0].send(b"VANILLA") + fd = sockets[1] + logger.debug(f"ConnectToEIS with fd {fd.fileno()}") + return dbus.types.UnixFd(fd) + except Exception as e: + logger.critical(e) + + +@dbus.service.method( + MAIN_IFACE, + sender_keyword="sender", + in_signature="oa{sv}", + out_signature="o", +) +def GetZones(self, session_handle: str, options: ASVType, sender: str): + try: + request = Request(bus_name=self.bus_name, sender=sender, options=options) + + if session_handle not in self.active_sessions: + request.respond(Response(2, {}, delay=self.delay)) + return request.handle + + serial = self.current_zone_serial + zones = self.current_zones + + results = {} + if serial is not None: + results["serial"] = dbus.UInt32(serial, variant_level=1) + if zones is not None: + results["zones"] = dbus.Array( + [dbus.Struct(z, signature="uuii") for z in zones], + signature="(uuii)", + variant_level=1, + ) + + response = Response(response=0, results=results) + + logger.debug(f"GetZones with response {response}") + request.respond(response, delay=self.delay) + + if self.change_zones_after > 0: + + def change_zones(): + logger.debug("Changing Zones") + old_serial = self.current_zone_serial + self.current_zone_serial = next(zone_serial) + self.current_zones = self.changed_zones + opts = {"serial": dbus.UInt32(old_serial, variant_level=1)} + self.EmitSignalDetailed( + "", + "ZonesChanged", + "oa{sv}", + [dbus.ObjectPath(session_handle), opts], + details={"destination": sender}, + ) + + GLib.timeout_add(self.change_zones_after, change_zones) + + return request.handle + except Exception as e: + logger.critical(e) + + +@dbus.service.method( + MAIN_IFACE, + sender_keyword="sender", + in_signature="oa{sv}aa{sv}u", + out_signature="o", +) +def SetPointerBarriers( + self, + session_handle: str, + options: ASVType, + barriers: List[ASVType], + serial: int, + sender: str, +): + try: + request = Request(bus_name=self.bus_name, sender=sender, options=options) + + if ( + session_handle not in self.active_sessions + or serial != self.current_zone_serial + ): + response = Response(2, {}) + else: + results = { + "failed_barriers": dbus.Array( + self.failed_barriers, signature="u", variant_level=1 + ) + } + response = Response(0, results) + + logger.debug(f"SetPointerBarriers with response {response}") + request.respond(response, delay=self.delay) + + return request.handle + except Exception as e: + logger.critical(e) + + +@dbus.service.method( + MAIN_IFACE, + sender_keyword="sender", + in_signature="oa{sv}", + out_signature="", +) +def Enable(self, session_handle, options, sender): + try: + logger.debug(f"Enable with options {options}") + allowed_options = [] + + if not all([k in allowed_options for k in options]): + logger.error("Enable does not support options") + + if self.activated_after > 0: + + current_eis_serial = next(eis_serial) if eis_serial else None + + def send_activated(): + opts = {} + if current_eis_serial is not None: + opts["serial"] = dbus.UInt32(current_eis_serial, variant_level=1) + + if self.activated_position is not None: + opts["cursor_position"] = dbus.Struct( + self.activated_position, signature="dd", variant_level=1 + ) + if self.activated_barrier is not None: + opts["barrier_id"] = dbus.UInt32( + self.activated_barrier, variant_level=1 + ) + + self.EmitSignalDetailed( + "", + "Activated", + "oa{sv}", + [dbus.ObjectPath(session_handle), opts], + details={"destination": sender}, + ) + + GLib.timeout_add(self.activated_after, send_activated) + + if self.deactivated_after > 0: + + def send_deactivated(): + opts = {} + if current_eis_serial: + opts["serial"] = dbus.UInt32( + current_eis_serial, variant_level=1 + ) + + if self.deactivated_position is not None: + opts["cursor_position"] = dbus.Struct( + self.deactivated_position, signature="dd", variant_level=1 + ) + + self.EmitSignalDetailed( + "", + "Deactivated", + "oa{sv}", + [dbus.ObjectPath(session_handle), opts], + details={"destination": sender}, + ) + + GLib.timeout_add( + self.activated_after + self.deactivated_after, send_deactivated + ) + + if self.disabled_after > 0: + + def send_disabled(): + self.EmitSignalDetailed( + "", + "Disabled", + "oa{sv}", + [dbus.ObjectPath(session_handle), {}], + details={"destination": sender}, + ) + + GLib.timeout_add(self.disabled_after, send_disabled) + + except Exception as e: + logger.critical(e) + + +@dbus.service.method( + MAIN_IFACE, + sender_keyword="sender", + in_signature="oa{sv}", + out_signature="", +) +def Disable(self, session_handle, options, sender): + try: + logger.debug(f"Disable with options {options}") + allowed_options = [] + + if not all([k in allowed_options for k in options]): + logger.error("Disable does not support options") + except Exception as e: + logger.critical(e) + + +@dbus.service.method( + MAIN_IFACE, + sender_keyword="sender", + in_signature="oa{sv}", + out_signature="", +) +def Release(self, session_handle, options, sender): + try: + logger.debug(f"Release with options {options}") + allowed_options = ["cursor_position"] + + if not all([k in allowed_options for k in options]): + logger.error("Invalid options for Release") + except Exception as e: + logger.critical(e) diff --git a/tests/pyportaltest/test_inputcapture.py b/tests/pyportaltest/test_inputcapture.py new file mode 100644 index 00000000..ccc1a470 --- /dev/null +++ b/tests/pyportaltest/test_inputcapture.py @@ -0,0 +1,498 @@ +# SPDX-License-Identifier: LGPL-3.0-only +# +# This file is formatted with Python Black + +from . import PortalTest +from typing import List, Optional + +import gi +import logging +import pytest +import os + +gi.require_version("Xdp", "1.0") +from gi.repository import GLib, Gio, Xdp + +logger = logging.getLogger(f"test.{__name__}") +logger.setLevel(logging.DEBUG) + + +class SessionSetup: + def __init__( + self, + session: Xdp.InputCaptureSession = None, + zones: List[Xdp.InputCaptureZone] = None, + barriers: List[Xdp.InputCapturePointerBarrier] = None, + failed_barriers: List[Xdp.InputCapturePointerBarrier] = None, + ): + self.session = session + self.zones = zones or [] + self.barriers = barriers or [] + self.failed_barriers = failed_barriers or [] + + +class SessionCreationFailed(Exception): + def __init__(self, glib_error): + self.glib_error = glib_error + + def __str__(self): + return f"SessionCreationFailed: {self.glib_error}" + + +class TestInputCapture(PortalTest): + def create_session_with_barriers( + self, + params=None, + parent=None, + capabilities=Xdp.Capability.POINTER_RELATIVE, + barriers=None, + allow_failed_barriers=False, + cancellable=None, + ) -> SessionSetup: + """ + Session creation helper. This function creates a session and sets up + pointer barriers, with defaults for everything. + """ + params = params or {} + self.setup_daemon(params) + + xdp = Xdp.Portal.new() + assert xdp is not None + + session, session_error = None, None + create_session_done_invoked = False + + def create_session_done(portal, task, data): + nonlocal session, session_error + nonlocal create_session_done_invoked + + create_session_done_invoked = True + try: + session = portal.create_input_capture_session_finish(task) + if session is None: + session_error = Exception("Session is NULL") + except GLib.GError as e: + session_error = e + self.mainloop.quit() + + xdp.create_input_capture_session( + parent=parent, + capabilities=capabilities, + cancellable=cancellable, + callback=create_session_done, + data=None, + ) + + self.mainloop.run() + assert create_session_done_invoked + if session_error is not None: + raise SessionCreationFailed(session_error) + + zones = session.get_zones() + + if barriers is None: + barriers = [Xdp.InputCapturePointerBarrier(id=1, x1=0, x2=1920, y1=0, y2=0)] + + signals = [None] * len(barriers) + + def sig_cb(barrier, active, idx): + nonlocal signals + + signals[idx] = active + if all([x is not None for x in signals]): + self.mainloop.quit() + + for idx, b in enumerate(barriers): + b.connect("active", sig_cb, idx) + + session.set_pointer_barriers(barriers) + self.mainloop.run() + + succeeded_barriers = [b for idx, b in enumerate(barriers) if signals[idx]] + failed_barriers = [b for idx, b in enumerate(barriers) if not signals[idx]] + + assert all([b.props.is_active for b in succeeded_barriers]) + assert all([not b.props.is_active for b in failed_barriers]) + + if not allow_failed_barriers: + assert ( + failed_barriers == [] + ), "Barriers failed but allow_failed_barriers was not set" + + return SessionSetup( + session=session, + zones=zones, + barriers=succeeded_barriers, + failed_barriers=failed_barriers, + ) + + def test_version(self): + """This tests the test suite setup rather than libportal""" + params = {} + self.setup_daemon(params) + assert self.properties_interface.Get(self.INTERFACE_NAME, "version") == 1 + + def test_session_create(self): + """ + The basic test of successful create and zone check + """ + params = { + "zones": [(1920, 1080, 0, 0), (1080, 1920, 1920, 1080)], + "zone-serial": 1234, + } + self.setup_daemon(params) + + capabilities = Xdp.Capability.POINTER_RELATIVE | Xdp.Capability.KEYBOARD + + setup = self.create_session_with_barriers(params, capabilities=capabilities) + assert setup.session is not None + zones = setup.zones + assert len(zones) == 2 + z1 = zones[0] + assert z1.props.width == 1920 + assert z1.props.height == 1080 + assert z1.props.x == 0 + assert z1.props.y == 0 + assert z1.props.serial == 1234 + + z2 = zones[1] + assert z2.props.width == 1080 + assert z2.props.height == 1920 + assert z2.props.x == 1920 + assert z2.props.y == 1080 + assert z2.props.serial == 1234 + + # Now verify our DBus calls were correct + method_calls = self.mock_interface.GetMethodCalls("CreateSession") + assert len(method_calls) == 1 + _, args = method_calls.pop(0) + parent, options = args + assert list(options.keys()) == [ + "handle_token", + "session_handle_token", + "capabilities", + ] + assert options["capabilities"] == capabilities + + method_calls = self.mock_interface.GetMethodCalls("GetZones") + assert len(method_calls) == 1 + _, args = method_calls.pop(0) + session_handle, options = args + assert list(options.keys()) == ["handle_token"] + + def test_session_create_cancel_during_create(self): + """ + Create a session but cancel while waiting for the CreateSession request + """ + params = {"delay": 1000} + self.setup_daemon(params) + cancellable = Gio.Cancellable() + GLib.timeout_add(300, cancellable.cancel) + + with pytest.raises(SessionCreationFailed) as e: + self.create_session_with_barriers(params=params, cancellable=cancellable) + assert "Operation was cancelled" in e.glib_error.message + + def test_session_create_cancel_during_getzones(self): + """ + Create a session but cancel while waiting for the GetZones request + """ + # libportal issues two requests: CreateSession and GetZones, + # param is set for each to delay 500 ms so if we cancel after 700, the + # one that is cancelled should be the GetZones one. + # Can't guarantee it but this is the best we can do + params = {"delay": 500} + self.setup_daemon(params) + cancellable = Gio.Cancellable() + GLib.timeout_add(700, cancellable.cancel) + + with pytest.raises(SessionCreationFailed) as e: + self.create_session_with_barriers(params=params, cancellable=cancellable) + assert "Operation was cancelled" in e.glib_error.message + + def test_session_create_no_serial_on_getzones(self): + """ + Test buggy portal implementation not replying with a serial in + GetZones + """ + params = { + "zone-serial": -1, + } + + self.setup_daemon(params) + with pytest.raises(SessionCreationFailed): + self.create_session_with_barriers(params) + + def test_session_create_no_zones_on_getzones(self): + """ + Test buggy portal implementation not replying with a serial in + GetZones + """ + params = { + "zones": [(0, 0, 0, 0)], + } + + self.setup_daemon(params) + with pytest.raises(SessionCreationFailed): + self.create_session_with_barriers(params) + + def test_connect_to_eis(self): + """ + The basic test of retrieving the EIS handle + """ + params = {} + self.setup_daemon(params) + setup = self.create_session_with_barriers(params) + assert setup.session is not None + + handle = setup.session.connect_to_eis() + assert handle >= 0 + + fd = os.fdopen(handle) + buf = fd.read() + assert buf == "VANILLA" # template sends this by default + + # Now verify our DBus calls were correct + method_calls = self.mock_interface.GetMethodCalls("ConnectToEIS") + assert len(method_calls) == 1 + _, args = method_calls.pop(0) + parent, options = args + assert "handle_token" not in options # This is not a Request + assert list(options.keys()) == [] + + def test_pointer_barriers_success(self): + """ + Some successful pointer barriers + """ + b1 = Xdp.InputCapturePointerBarrier(id=1, x1=0, x2=1920, y1=0, y2=0) + b2 = Xdp.InputCapturePointerBarrier(id=2, x1=1920, x2=1920, y1=0, y2=1080) + + params = {} + self.setup_daemon(params) + setup = self.create_session_with_barriers(params, barriers=[b1, b2]) + assert setup.barriers == [b1, b2] + + # Now verify our DBus calls were correct + method_calls = self.mock_interface.GetMethodCalls("SetPointerBarriers") + assert len(method_calls) == 1 + _, args = method_calls.pop(0) + session_handle, options, barriers, serial = args + assert list(options.keys()) == ["handle_token"] + for b in barriers: + assert "barrier_id" in b + assert "position" in b + assert b["barrier_id"] in [1, 2] + x1, y1, x2, y2 = [int(x) for x in b["position"]] + if b["barrier_id"] == 1: + assert (x1, y1, x2, y2) == (0, 0, 1920, 0) + if b["barrier_id"] == 2: + assert (x1, y1, x2, y2) == (1920, 0, 1920, 1080) + + def test_pointer_barriers_failures(self): + """ + Test with some barriers failing + """ + b1 = Xdp.InputCapturePointerBarrier(id=1, x1=0, x2=1920, y1=0, y2=0) + b2 = Xdp.InputCapturePointerBarrier(id=2, x1=1, x2=2, y1=3, y2=4) + b3 = Xdp.InputCapturePointerBarrier(id=3, x1=1, x2=2, y1=3, y2=4) + b4 = Xdp.InputCapturePointerBarrier(id=4, x1=1920, x2=1920, y1=0, y2=1080) + + params = {"failed-barriers": [2, 3]} + self.setup_daemon(params) + setup = self.create_session_with_barriers( + params, barriers=[b1, b2, b3, b4], allow_failed_barriers=True + ) + assert setup.barriers == [b1, b4] + assert setup.failed_barriers == [b2, b3] + + # Now verify our DBus calls were correct + method_calls = self.mock_interface.GetMethodCalls("SetPointerBarriers") + assert len(method_calls) == 1 + _, args = method_calls.pop(0) + session_handle, options, barriers, serial = args + assert list(options.keys()) == ["handle_token"] + for b in barriers: + assert "barrier_id" in b + assert "position" in b + assert b["barrier_id"] in [1, 2, 3, 4] + x1, y1, x2, y2 = [int(x) for x in b["position"]] + if b["barrier_id"] == 1: + assert (x1, y1, x2, y2) == (0, 0, 1920, 0) + if b["barrier_id"] in [2, 3]: + assert (x1, y1, x2, y2) == (1, 3, 2, 4) + if b["barrier_id"] == 4: + assert (x1, y1, x2, y2) == (1920, 0, 1920, 1080) + + def test_enable_disable_release(self): + """ + Test enable/disable calls + """ + params = {} + self.setup_daemon(params) + + setup = self.create_session_with_barriers(params) + session = setup.session + + session.enable() + session.disable() + session.release() + + self.mainloop.run() + + # Now verify our DBus calls were correct + method_calls = self.mock_interface.GetMethodCalls("Enable") + assert len(method_calls) == 1 + _, args = method_calls.pop(0) + session_handle, options = args + assert list(options.keys()) == [] + + method_calls = self.mock_interface.GetMethodCalls("Disable") + assert len(method_calls) == 1 + _, args = method_calls.pop(0) + session_handle, options = args + assert list(options.keys()) == [] + + method_calls = self.mock_interface.GetMethodCalls("Release") + assert len(method_calls) == 1 + _, args = method_calls.pop(0) + session_handle, options = args + assert list(options.keys()) == [] + + def test_release_at(self): + """ + Test the release_at call with a cursor position + """ + params = {} + self.setup_daemon(params) + + setup = self.create_session_with_barriers(params) + session = setup.session + + # libportal allows us to call Release without Enable first + session.release_at(cursor_x_position=10, cursor_y_position=10) + self.mainloop.run() + + # Now verify our DBus calls were correct + method_calls = self.mock_interface.GetMethodCalls("Release") + assert len(method_calls) == 1 + _, args = method_calls.pop(0) + session_handle, options = args + assert list(options.keys()) == ["cursor_position"] + cursor_position = options["cursor_position"] + assert cursor_position == (10.0, 10.0) + + def test_activated(self): + """ + Test the Activated signal + """ + params = { + "eis-serial": 123, + "activated-after": 20, + "activated-barrier": 1, + "activated-position": (10.0, 20.0), + "deactivated-after": 20, + "deactivated-position": (20.0, 30.0), + } + self.setup_daemon(params) + + setup = self.create_session_with_barriers(params) + session = setup.session + + session_activated_signal_received = False + session_deactivated_signal_received = False + signal_activated_options = None + signal_deactivated_options = None + + def session_activated(session, opts): + nonlocal session_activated_signal_received, signal_activated_options + session_activated_signal_received = True + signal_activated_options = opts + + def session_deactivated(session, opts): + nonlocal session_deactivated_signal_received, signal_deactivated_options + session_deactivated_signal_received = True + signal_deactivated_options = opts + self.mainloop.quit() + + session.connect("activated", session_activated) + session.connect("deactivated", session_deactivated) + session.enable() + + self.mainloop.run() + + assert session_activated_signal_received + assert list(signal_activated_options.keys()) == [ + "serial", + "cursor_position", + "barrier_id", + ] + assert signal_activated_options["barrier_id"] == 1 + assert signal_activated_options["cursor_position"] == (10.0, 20.0) + assert signal_activated_options["serial"] == 123 + + assert session_deactivated_signal_received + assert list(signal_deactivated_options.keys()) == ["serial", "cursor_position"] + assert signal_deactivated_options["cursor_position"] == (20.0, 30.0) + assert signal_deactivated_options["serial"] == 123 + + def test_zones_changed(self): + """ + Test the ZonesChanged signal + """ + params = { + "zones": [(1920, 1080, 0, 0), (1080, 1920, 1920, 1080)], + "changed-zones": [(1024, 768, 0, 0)], + "change-zones-after": 200, + "zone-serial": 567, + } + self.setup_daemon(params) + + setup = self.create_session_with_barriers(params) + session = setup.session + + signal_received = False + signal_options = None + + def zones_changed(session, opts): + nonlocal signal_received, signal_options + signal_received = True + signal_options = opts + self.mainloop.quit() + + session.connect("zones-changed", zones_changed) + + self.mainloop.run() + + assert signal_received + assert signal_options is not None + assert list(signal_options.keys()) == ["serial"] + assert signal_options["serial"] == 567 + + assert all([z.props.serial == 568 for z in session.get_zones()]) + + def test_disabled(self): + """ + Test the Disabled signal + """ + params = { + "disabled-after": 20, + } + self.setup_daemon(params) + + setup = self.create_session_with_barriers(params) + session = setup.session + + disabled_signal_received = False + + def session_disabled(session): + nonlocal disabled_signal_received + disabled_signal_received = True + self.mainloop.quit() + + session.connect("disabled", session_disabled) + + session.enable() + + self.mainloop.run() + + assert disabled_signal_received