diff --git a/qubes/tests/integ/basic.py b/qubes/tests/integ/basic.py index 1f574dd3e..eef299eaa 100644 --- a/qubes/tests/integ/basic.py +++ b/qubes/tests/integ/basic.py @@ -472,71 +472,80 @@ def setUp(self): super(TC_30_Gui_daemon, self).setUp() self.init_default_template() - @unittest.skipUnless( - spawn.find_executable('xdotool'), - "xdotool not installed") - def test_000_clipboard(self): - testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM, - name=self.make_vm_name('vm1'), label='red') - self.loop.run_until_complete(testvm1.create_on_disk()) - testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM, - name=self.make_vm_name('vm2'), label='red') - self.loop.run_until_complete(testvm2.create_on_disk()) + async def _test_clipboard(self, test_string, + set_features=None, + expect_content=None, + expect_source_name=None): + testvm1 = self.app.add_new_vm( + qubes.vm.appvm.AppVM, + name=self.make_vm_name('vm1'), label='red') + await testvm1.create_on_disk() + testvm2 = self.app.add_new_vm( + qubes.vm.appvm.AppVM, + name=self.make_vm_name('vm2'), label='red') + await testvm2.create_on_disk() self.app.save() - self.loop.run_until_complete(asyncio.gather( + for feature, value in (set_features or {}).items(): + testvm1.features[feature] = value + + await asyncio.gather( testvm1.start(), - testvm2.start())) - self.loop.run_until_complete(asyncio.gather( + testvm2.start()) + await asyncio.gather( self.wait_for_session(testvm1), - self.wait_for_session(testvm2))) + self.wait_for_session(testvm2)) + p = await testvm1.run("cat > /tmp/source.txt", stdin=subprocess.PIPE) + await p.communicate(test_string.encode()) window_title = 'user@{}'.format(testvm1.name) - self.loop.run_until_complete(testvm1.run( - 'zenity --text-info --editable --title={}'.format(window_title))) + await testvm1.run( + 'zenity --text-info ' + '--filename=/tmp/source.txt ' + '--editable ' + '--title={}'.format(window_title)) - self.wait_for_window(window_title) - self.loop.run_until_complete(asyncio.sleep(5)) - test_string = "test{}".format(testvm1.xid) + await self.wait_for_window_coro(window_title) + await asyncio.sleep(5) - # Type and copy some text - subprocess.check_call(['xdotool', 'search', '--name', window_title, - 'windowactivate', '--sync', - 'type', test_string]) - self.loop.run_until_complete(asyncio.sleep(5)) # second xdotool call because type --terminator do not work (SEGV) # additionally do not use search here, so window stack will be empty # and xdotool will use XTEST instead of generating events manually - # this will be much better - at least because events will have # correct timestamp (so gui-daemon would not drop the copy request) - subprocess.check_call(['xdotool', - 'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c', - 'Escape']) + subprocess.check_call(['xdotool', 'key', 'ctrl+a', 'ctrl+c']) + # wait a bit to let the zenity actually copy + await asyncio.sleep(1) + subprocess.check_call(['xdotool', 'key', 'ctrl+shift+c', 'Escape']) - self.wait_for_window(window_title, show=False) + await self.wait_for_window_coro(window_title, show=False) clipboard_content = \ open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip() + + if expect_content is not None: + test_string = expect_content self.assertEqual(clipboard_content, test_string, "Clipboard copy operation failed - content") + if expect_source_name is None: + expect_source_name = testvm1.name clipboard_source = \ open('/var/run/qubes/qubes-clipboard.bin.source', 'r').read().strip() - self.assertEqual(clipboard_source, testvm1.name, + self.assertEqual(clipboard_source, expect_source_name, "Clipboard copy operation failed - owner") # Then paste it to the other window window_title = 'user@{}'.format(testvm2.name) - p = self.loop.run_until_complete(testvm2.run( - 'zenity --entry --title={} > /tmp/test.txt'.format(window_title))) - self.wait_for_window(window_title) + p = await testvm2.run( + 'zenity --text-info --editable --title={} > /tmp/test.txt'.format(window_title)) + await self.wait_for_window_coro(window_title) subprocess.check_call(['xdotool', 'key', '--delay', '100', 'ctrl+shift+v', 'ctrl+v', 'Return', 'alt+o']) - self.loop.run_until_complete(p.wait()) + await p.wait() # And compare the result - (test_output, _) = self.loop.run_until_complete( - testvm2.run_for_stdio('cat /tmp/test.txt')) + (test_output, _) = await testvm2.run_for_stdio('cat /tmp/test.txt') self.assertEqual(test_string, test_output.strip().decode('ascii')) clipboard_content = \ @@ -549,6 +558,46 @@ def test_000_clipboard(self): self.assertEqual(clipboard_source, "", "Clipboard not wiped after paste - owner") + @unittest.skipUnless( + spawn.find_executable('xdotool'), + "xdotool not installed") + def test_000_clipboard(self): + test_string = "test123" + self.loop.run_until_complete(self._test_clipboard(test_string)) + + @unittest.skipUnless( + spawn.find_executable('xdotool'), + "xdotool not installed") + def test_001_clipboard_64k(self): + test_string = "test123abc" * 6400 + self.loop.run_until_complete(self._test_clipboard(test_string)) + + @unittest.skipUnless( + spawn.find_executable('xdotool'), + "xdotool not installed") + def test_002_clipboard_200k_truncated(self): + test_string = "test123abc" * 20000 + self.loop.run_until_complete(self._test_clipboard(test_string, + expect_content="", expect_source_name="")) + + @unittest.skipUnless( + spawn.find_executable('xdotool'), + "xdotool not installed") + def test_002_clipboard_200k(self): + test_string = "test123abc" * 20000 + self.loop.run_until_complete(self._test_clipboard(test_string, + set_features={"gui-max-clipboard-size": 200_000})) + + @unittest.skipUnless( + spawn.find_executable('xdotool'), + "xdotool not installed") + def test_002_clipboard_300k(self): + test_string = "test123abc" * 30000 + self.loop.run_until_complete(self._test_clipboard(test_string, + expect_content="Qube clipboard size over 256KiB and X11 INCR " + "protocol support is not implemented!")) + + class TC_05_StandaloneVMMixin(object): def setUp(self): super(TC_05_StandaloneVMMixin, self).setUp()