diff --git a/quarkchain/p2p/nat.py b/quarkchain/p2p/nat.py index 2dbefb392..bfb8e4cde 100644 --- a/quarkchain/p2p/nat.py +++ b/quarkchain/p2p/nat.py @@ -1,48 +1,21 @@ + +import aiohttp import asyncio -from concurrent.futures import ThreadPoolExecutor -import ipaddress -from typing import List, NamedTuple -from urllib.parse import urlparse +import socket +from contextlib import suppress +from typing import Optional from quarkchain.p2p.cancel_token.token import CancelToken, OperationCancelled -from quarkchain.p2p.exceptions import NoInternalAddressMatchesDevice from quarkchain.p2p.service import BaseService -import netifaces -import upnpclient +from async_upnp_client.aiohttp import AiohttpSessionRequester +from async_upnp_client.client_factory import UpnpFactory +from async_upnp_client.search import async_search # UPnP discovery can take a long time, so use a loooong timeout here. UPNP_DISCOVER_TIMEOUT_SECONDS = 30 -PortMapping = NamedTuple( - "PortMapping", - [ - ("internal", str), # of the form "192.2.3.4:56" - ("external", str), # of the form "192.2.3.4:56" - ], -) - - -def find_internal_ip_on_device_network(upnp_dev: upnpclient.upnp.Device) -> str: - """ - For a given UPnP device, return the internal IP address of this host machine that can - be used for a NAT mapping. - """ - parsed_url = urlparse(upnp_dev.location) - # Get an ipaddress.IPv4Network instance for the upnp device's network. - upnp_dev_net = ipaddress.ip_network(parsed_url.hostname + "/24", strict=False) - for iface in netifaces.interfaces(): - for family, addresses in netifaces.ifaddresses(iface).items(): - # TODO: Support IPv6 addresses as well. - if family != netifaces.AF_INET: - continue - for item in addresses: - if ipaddress.ip_address(item["addr"]) in upnp_dev_net: - return item["addr"] - raise NoInternalAddressMatchesDevice(device_hostname=parsed_url.hostname) - - class UPnPService(BaseService): """ Generate a mapping of external network IP address/port to internal IP address/port, @@ -58,9 +31,46 @@ def __init__(self, port: int, token: CancelToken = None) -> None: """ super().__init__(token) self.port = port - self._mapping = ( - None - ) # : PortMapping when called externally, this never returns None + self._session = None + self._service = None + + + # ----------------------------- + # Public API + # ----------------------------- + + async def discover(self) -> Optional[str]: + """ + Discover router and create initial port mapping. + Returns external IP if successful. + """ + self._session = aiohttp.ClientSession() + await self._discover(self._session) + + if not self._service: + self.logger.warning("No UPnP WANIP service found") + await self._close_session() + return None + + await self._add_port_mapping() + + return await self._get_external_ip() + + + async def stop(self): + await self._delete_port_mapping() + await self._close_session() + + async def _close_session(self): + if self._session: + await self._session.close() + self._session = None + + + + # ----------------------------- + # Internal logic + # ----------------------------- async def _run(self) -> None: """Run an infinite loop refreshing our NAT port mapping. @@ -72,112 +82,127 @@ async def _run(self) -> None: try: # Wait for the port mapping lifetime, and then try registering it again await self.wait(asyncio.sleep(self._nat_portmap_lifetime)) - await self.add_nat_portmap() + if self._service: + await self._add_port_mapping() except OperationCancelled: break except Exception: self.logger.exception("Failed to setup NAT portmap") - async def add_nat_portmap(self) -> str: - """ - Set up the port mapping - :return: the IP address of the new mapping (or None if failed) - """ - self.logger.info("Setting up NAT portmap...") - try: - devices = await self._discover_upnp_devices() - if devices: - self.logger.info( - "Adding NAT port map on {} devices...".format(len(devices)) - ) - for upnp_dev in devices: - try: - external_ip = self._add_nat_portmap(upnp_dev) - except NoInternalAddressMatchesDevice as exc: - self.logger.info( - "No internal addresses were managed by the UPnP device at %s", - exc.device_hostname, - ) - continue - else: - return external_ip - except upnpclient.soap.SOAPError as e: - if e.args == (718, "ConflictInMappingEntry"): - # An entry already exists with the parameters we specified. Maybe the router - # didn't clean it up after it expired or it has been configured by other piece - # of software, either way we should not override it. - # https://tools.ietf.org/id/draft-ietf-pcp-upnp-igd-interworking-07.html#errors - self.logger.info( - "NAT port mapping already configured, not overriding it" - ) - else: - self.logger.exception("Failed to setup NAT portmap") + async def _discover(self, session): + requester = AiohttpSessionRequester(session) + factory = UpnpFactory(requester) - self.logger.warning("No NAT mapping has been set") - self._mapping = None - return None + async def on_response(response): + try: + device = await factory.async_create_device(response.location) - def current_mapping(self) -> PortMapping: - if self._mapping is None: - unbound = ":%d" % self.port - return PortMapping(unbound, unbound) - else: - return self._mapping + for service in device.services.values(): + if "WANIPConn" in service.service_type: + self._service = service + self.logger.info("Found UPnP WANIP service") + return + except Exception as e: + self.logger.debug(f"Ignoring device: {e}") + + await async_search(on_response, timeout=UPNP_DISCOVER_TIMEOUT_SECONDS) - def _add_nat_portmap(self, upnp_dev: upnpclient.upnp.Device) -> str: - # Detect our internal IP address (which raises if there are no matches) - internal_ip = find_internal_ip_on_device_network(upnp_dev) - external_ip = upnp_dev.WANIPConn1.GetExternalIPAddress()["NewExternalIPAddress"] + async def _add_port_mapping(self): + internal_ip = self._get_internal_ip() + + self.logger.info( + f"Adding port mapping {self.port}->{internal_ip}:{self.port}" + ) + for protocol, description in [ ("TCP", "ethereum p2p"), ("UDP", "ethereum discovery"), ]: - upnp_dev.WANIPConn1.AddPortMapping( - NewRemoteHost=external_ip, + await self._service.async_call_action( + "AddPortMapping", + NewRemoteHost="", # should we use _get_external_ip() to replace this? NewExternalPort=self.port, NewProtocol=protocol, NewInternalPort=self.port, NewInternalClient=internal_ip, - NewEnabled="1", + NewEnabled=1, NewPortMappingDescription=description, NewLeaseDuration=self._nat_portmap_lifetime, ) - self._mapping = PortMapping( - "%s:%d" % (internal_ip, self.port), "%s:%d" % (external_ip, self.port) - ) - self.logger.info("NAT port forwarding successfully set up: %r", self._mapping) - return external_ip - - async def _discover_upnp_devices(self) -> List[upnpclient.upnp.Device]: - loop = asyncio.get_event_loop() - # Use loop.run_in_executor() because upnpclient.discover() is blocking and may take a - # while to complete. We must use a ThreadPoolExecutor() because the - # response from upnpclient.discover() can't be pickled. - try: - with ThreadPoolExecutor(max_workers=1) as executor: - devices = await self.wait( - loop.run_in_executor(executor, upnpclient.discover), - timeout=UPNP_DISCOVER_TIMEOUT_SECONDS, - ) - except TimeoutError: - self.logger.info("Timeout waiting for UPNP-enabled devices") - return - else: - self.logger.debug("Found %d candidate NAT devices", len(devices)) - # If there are no UPNP devices we can exit early - if not devices: - self.logger.info("No UPNP-enabled devices found") + async def _delete_port_mapping(self): + if not self._service: return - # Now we loop over all of the devices until we find one that we can use. - retv = [] - for device in devices: - try: - device.WANIPConn1 - except AttributeError: - continue - retv.append(device) - return retv + for protocol in ["TCP", "UDP"]: + with suppress(Exception): + await self._service.async_call_action( + "DeletePortMapping", + NewRemoteHost="", + NewExternalPort=self.port, + NewProtocol=protocol, + ) + self.logger.info("Deleted UPnP port mapping") + + + async def _get_external_ip(self) -> Optional[str]: + if not self._service: + return None + + try: + result = await self._service.async_call_action("GetExternalIPAddress") + return result.get("NewExternalIPAddress") + except Exception as e: + self.logger.warning(f"Failed to get external IP: {e}") + return None + + + def _get_internal_ip(self) -> str: + """ + Robust internal IP detection using socket trick. + """ + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + s.connect(("8.8.8.8", 80)) + return s.getsockname()[0] + finally: + s.close() + + +if __name__ == "__main__": + import logging + import argparse + + from quarkchain.utils import Logger + Logger.set_logging_level("info") + + parser = argparse.ArgumentParser(description="Test UPnP NAT port mapping") + parser.add_argument("--port", type=int, default=38291, help="Port to map (default: 38291)") + args = parser.parse_args() + + async def main(): + svc = UPnPService(port=args.port) + + # Test _get_internal_ip + internal_ip = svc._get_internal_ip() + print(f"Internal IP: {internal_ip}") + + # Test _get_external_ip (without UPnP, falls back to None) + external_ip_before = await svc._get_external_ip() + print(f"External IP (before discover): {external_ip_before}") + + # Test UPnP discover + port mapping + print(f"\nDiscovering UPnP devices (timeout {UPNP_DISCOVER_TIMEOUT_SECONDS}s)...") + external_ip = await svc.discover() + if external_ip: + print(f"External IP: {external_ip}") + print(f"Port {args.port} mapped successfully") + input("Press Enter to remove mapping and exit...") + await svc.stop() + print("Mapping removed") + else: + print("UPnP discovery failed - no suitable device found") + + asyncio.run(main()) diff --git a/quarkchain/p2p/p2p_server.py b/quarkchain/p2p/p2p_server.py index 2a0675552..6e518651e 100644 --- a/quarkchain/p2p/p2p_server.py +++ b/quarkchain/p2p/p2p_server.py @@ -102,7 +102,7 @@ async def _run(self) -> None: self.logger.info("Running server...") mapped_external_ip = None if self.upnp_service: - mapped_external_ip = await self.upnp_service.add_nat_portmap() + mapped_external_ip = await self.upnp_service.discover() external_ip = mapped_external_ip or "0.0.0.0" await self._start_tcp_listener() self.logger.info( diff --git a/quarkchain/p2p/tests/test_nat.py b/quarkchain/p2p/tests/test_nat.py new file mode 100644 index 000000000..fad9cdbfa --- /dev/null +++ b/quarkchain/p2p/tests/test_nat.py @@ -0,0 +1,483 @@ +import asyncio +import socket +import pytest +from unittest.mock import MagicMock, AsyncMock, patch + +from quarkchain.p2p.cancel_token.token import OperationCancelled + +# CancelToken uses deprecated asyncio.Event(loop=loop) removed in py3.10+, +# patch it to ignore the loop kwarg +import quarkchain.p2p.cancel_token.token as _ct + +_OrigCancelToken = _ct.CancelToken + + +class _PatchedCancelToken(_OrigCancelToken): + def __init__(self, name, loop=None): + self.name = name + self._chain = [] + self._triggered = asyncio.Event() + self._loop = None + + +_ct.CancelToken = _PatchedCancelToken + +from quarkchain.p2p.nat import UPnPService + +from quarkchain.utils import Logger +if not Logger._qkc_logger: + Logger.set_logging_level("warning") + + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +MOCK_EXTERNAL_IP = "198.51.100.1" +MOCK_INTERNAL_IP = "192.168.1.100" +MOCK_DEVICE_URL = "http://192.168.1.1:5000/mock-device.xml" + + +# --------------------------------------------------------------------------- +# Helpers / Fixtures +# --------------------------------------------------------------------------- + +def _make_mock_service(external_ip=MOCK_EXTERNAL_IP): + """Create a mock WANIPConnection service.""" + service = MagicMock() + service.service_type = "urn:schemas-upnp-org:service:WANIPConnection:1" + + async def async_call_action(action_name, **kwargs): + if action_name == "GetExternalIPAddress": + return {"NewExternalIPAddress": external_ip} + return {} + + service.async_call_action = AsyncMock(side_effect=async_call_action) + return service + + +@pytest.fixture +def mock_socket(): + with patch("quarkchain.p2p.nat.socket") as m: + sock = MagicMock() + sock.getsockname.return_value = (MOCK_INTERNAL_IP, 12345) + m.AF_INET = socket.AF_INET + m.SOCK_DGRAM = socket.SOCK_DGRAM + m.socket.return_value = sock + yield m, sock + + +@pytest.fixture +def mock_aiohttp(): + with patch("quarkchain.p2p.nat.aiohttp") as m: + session = MagicMock() + session.close = AsyncMock() + m.ClientSession.return_value = session + yield m, session + + +def _fake_wait_after(svc, iterations): + """Return a fake wait that cancels after N iterations.""" + call_count = 0 + + async def fake_wait(awaitable, timeout=None): + nonlocal call_count + await awaitable + call_count += 1 + if call_count >= iterations: + svc.cancel_token.trigger() + raise OperationCancelled("test done") + + return fake_wait + + +# --------------------------------------------------------------------------- +# _get_internal_ip +# --------------------------------------------------------------------------- + +def test_get_internal_ip(mock_socket): + _, sock = mock_socket + svc = UPnPService(port=30303) + + assert svc._get_internal_ip() == MOCK_INTERNAL_IP + sock.connect.assert_called_once_with(("8.8.8.8", 80)) + sock.close.assert_called_once() + + +# --------------------------------------------------------------------------- +# _get_external_ip +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_get_external_ip_with_service(): + svc = UPnPService(port=30303) + svc._service = _make_mock_service(MOCK_EXTERNAL_IP) + + assert await svc._get_external_ip() == MOCK_EXTERNAL_IP + + +@pytest.mark.asyncio +async def test_get_external_ip_no_service(): + svc = UPnPService(port=30303) + svc._service = None + + assert await svc._get_external_ip() is None + + +@pytest.mark.asyncio +async def test_get_external_ip_exception_returns_none(): + svc = UPnPService(port=30303) + svc._service = MagicMock() + svc._service.async_call_action = AsyncMock(side_effect=RuntimeError("timeout")) + + assert await svc._get_external_ip() is None + + +# --------------------------------------------------------------------------- +# _add_port_mapping +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_add_port_mapping(mock_socket): + svc = UPnPService(port=30303) + mock_svc = _make_mock_service() + svc._service = mock_svc + + await svc._add_port_mapping() + + assert mock_svc.async_call_action.call_count == 2 + calls = mock_svc.async_call_action.call_args_list + assert calls[0].args[0] == "AddPortMapping" + assert calls[0].kwargs["NewProtocol"] == "TCP" + assert calls[0].kwargs["NewInternalClient"] == MOCK_INTERNAL_IP + assert calls[0].kwargs["NewExternalPort"] == 30303 + assert calls[1].args[0] == "AddPortMapping" + assert calls[1].kwargs["NewProtocol"] == "UDP" + + +# --------------------------------------------------------------------------- +# _delete_port_mapping +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_delete_port_mapping(): + svc = UPnPService(port=30303) + mock_svc = _make_mock_service() + svc._service = mock_svc + + await svc._delete_port_mapping() + + delete_calls = [ + c for c in mock_svc.async_call_action.call_args_list + if c.args[0] == "DeletePortMapping" + ] + assert len(delete_calls) == 2 + assert delete_calls[0].kwargs["NewProtocol"] == "TCP" + assert delete_calls[1].kwargs["NewProtocol"] == "UDP" + + +@pytest.mark.asyncio +async def test_delete_port_mapping_no_service(): + svc = UPnPService(port=30303) + svc._service = None + # Should not raise + await svc._delete_port_mapping() + + +# --------------------------------------------------------------------------- +# _close_session +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_close_session(): + svc = UPnPService(port=30303) + mock_session = MagicMock() + mock_session.close = AsyncMock() + svc._session = mock_session + + await svc._close_session() + + mock_session.close.assert_awaited_once() + assert svc._session is None + + +@pytest.mark.asyncio +async def test_close_session_already_none(): + svc = UPnPService(port=30303) + svc._session = None + # Should not raise + await svc._close_session() + + +# --------------------------------------------------------------------------- +# discover (end-to-end with mocked deps) +# --------------------------------------------------------------------------- + +@patch("quarkchain.p2p.nat.UpnpFactory") +@patch("quarkchain.p2p.nat.AiohttpSessionRequester") +@patch("quarkchain.p2p.nat.async_search") +@pytest.mark.asyncio +async def test_discover_success(mock_async_search, mock_requester_cls, + mock_factory_cls, mock_socket, mock_aiohttp): + _, session = mock_aiohttp + mock_wan_service = _make_mock_service(MOCK_EXTERNAL_IP) + + fake_device = MagicMock() + fake_device.services = {"WANIPConn1": mock_wan_service} + + mock_factory = mock_factory_cls.return_value + mock_factory.async_create_device = AsyncMock(return_value=fake_device) + + async def fake_search(on_response, timeout=30): + response = MagicMock() + response.location = MOCK_DEVICE_URL + await on_response(response) + + mock_async_search.side_effect = fake_search + + svc = UPnPService(port=30303) + external_ip = await svc.discover() + + assert external_ip == MOCK_EXTERNAL_IP + mock_factory.async_create_device.assert_awaited_once_with( + MOCK_DEVICE_URL + ) + # 2x AddPortMapping (TCP+UDP) + 1x GetExternalIPAddress + assert mock_wan_service.async_call_action.call_count == 3 + + +@patch("quarkchain.p2p.nat.async_search") +@pytest.mark.asyncio +async def test_discover_no_device(mock_async_search, mock_aiohttp): + _, session = mock_aiohttp + mock_async_search.side_effect = AsyncMock() + + svc = UPnPService(port=30303) + result = await svc.discover() + + assert result is None + session.close.assert_awaited_once() + + +@patch("quarkchain.p2p.nat.UpnpFactory") +@patch("quarkchain.p2p.nat.AiohttpSessionRequester") +@patch("quarkchain.p2p.nat.async_search") +@pytest.mark.asyncio +async def test_discover_skips_device_without_wanipconn(mock_async_search, + mock_requester_cls, + mock_factory_cls, + mock_aiohttp): + _, session = mock_aiohttp + # Device has no WANIPConn service + fake_device = MagicMock() + non_wan_service = MagicMock() + non_wan_service.service_type = "urn:schemas-upnp-org:service:Layer3Forwarding:1" + fake_device.services = {"L3Fwd": non_wan_service} + + mock_factory = mock_factory_cls.return_value + mock_factory.async_create_device = AsyncMock(return_value=fake_device) + + async def fake_search(on_response, timeout=30): + response = MagicMock() + response.location = MOCK_DEVICE_URL + await on_response(response) + + mock_async_search.side_effect = fake_search + + svc = UPnPService(port=30303) + result = await svc.discover() + + assert result is None + session.close.assert_awaited_once() + + +@patch("quarkchain.p2p.nat.UpnpFactory") +@patch("quarkchain.p2p.nat.AiohttpSessionRequester") +@patch("quarkchain.p2p.nat.async_search") +@pytest.mark.asyncio +async def test_discover_ignores_device_creation_error(mock_async_search, + mock_requester_cls, + mock_factory_cls, + mock_aiohttp): + _, session = mock_aiohttp + mock_factory = mock_factory_cls.return_value + mock_factory.async_create_device = AsyncMock( + side_effect=RuntimeError("connection refused") + ) + + async def fake_search(on_response, timeout=30): + response = MagicMock() + response.location = MOCK_DEVICE_URL + await on_response(response) + + mock_async_search.side_effect = fake_search + + svc = UPnPService(port=30303) + result = await svc.discover() + + assert result is None + session.close.assert_awaited_once() + + +# --------------------------------------------------------------------------- +# stop +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_stop(): + svc = UPnPService(port=30303) + mock_svc = _make_mock_service() + svc._service = mock_svc + mock_session = MagicMock() + mock_session.close = AsyncMock() + svc._session = mock_session + + await svc.stop() + + delete_calls = [ + c for c in mock_svc.async_call_action.call_args_list + if c.args[0] == "DeletePortMapping" + ] + assert len(delete_calls) == 2 + mock_session.close.assert_awaited_once() + assert svc._session is None + + +# --------------------------------------------------------------------------- +# _run +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_run_refreshes_port_mapping(mock_socket): + svc = UPnPService(port=30303) + svc._service = _make_mock_service() + svc._nat_portmap_lifetime = 0 + svc.events.started.set() + svc.wait = _fake_wait_after(svc, iterations=2) + + await svc._run() + + add_calls = [ + c for c in svc._service.async_call_action.call_args_list + if c.args[0] == "AddPortMapping" + ] + # TCP + UDP from one refresh (second iteration cancels before _add_port_mapping) + assert len(add_calls) == 2 + + +@pytest.mark.asyncio +async def test_run_skips_mapping_without_service(): + svc = UPnPService(port=30303) + svc._service = None + svc._nat_portmap_lifetime = 0 + svc.events.started.set() + svc.wait = _fake_wait_after(svc, iterations=1) + svc._add_port_mapping = AsyncMock() + + await svc._run() + + svc._add_port_mapping.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_run_exits_on_cancel(): + svc = UPnPService(port=30303) + svc._service = _make_mock_service() + svc._nat_portmap_lifetime = 0 + svc.events.started.set() + + async def fake_wait(awaitable, timeout=None): + awaitable.close() # prevent "coroutine never awaited" warning + raise OperationCancelled("cancelled") + + svc.wait = fake_wait + + await svc._run() + + assert svc._service.async_call_action.call_count == 0 + + +@pytest.mark.asyncio +async def test_run_continues_on_exception(mock_socket): + svc = UPnPService(port=30303) + svc._service = _make_mock_service() + svc._nat_portmap_lifetime = 0 + svc.events.started.set() + svc.wait = _fake_wait_after(svc, iterations=3) + + original_add = svc._add_port_mapping + attempt = 0 + + async def flaky_add(): + nonlocal attempt + attempt += 1 + if attempt == 1: + raise RuntimeError("temporary failure") + await original_add() + + svc._add_port_mapping = flaky_add + + await svc._run() + + assert attempt == 2 + + +# --------------------------------------------------------------------------- +# Full lifecycle: discover -> _run refresh -> stop +# --------------------------------------------------------------------------- + +@patch("quarkchain.p2p.nat.UpnpFactory") +@patch("quarkchain.p2p.nat.AiohttpSessionRequester") +@patch("quarkchain.p2p.nat.async_search") +@pytest.mark.asyncio +async def test_full_lifecycle(mock_async_search, mock_requester_cls, + mock_factory_cls, mock_socket, mock_aiohttp): + """Test the complete lifecycle: discover -> _run refresh -> stop.""" + _, session = mock_aiohttp + mock_wan_service = _make_mock_service(MOCK_EXTERNAL_IP) + + fake_device = MagicMock() + fake_device.services = {"WANIPConn1": mock_wan_service} + + mock_factory = mock_factory_cls.return_value + mock_factory.async_create_device = AsyncMock(return_value=fake_device) + + async def fake_search(on_response, timeout=30): + response = MagicMock() + response.location = MOCK_DEVICE_URL + await on_response(response) + + mock_async_search.side_effect = fake_search + + svc = UPnPService(port=38291) + + # Phase 1: discover — should find service and add initial port mapping + external_ip = await svc.discover() + + assert external_ip == MOCK_EXTERNAL_IP + assert svc._service is mock_wan_service + assert svc._session is not None + + all_calls = mock_wan_service.async_call_action.call_args_list + add_calls = [c for c in all_calls if c.args[0] == "AddPortMapping"] + assert len(add_calls) == 2 # TCP + UDP from discover + + # Phase 2: _run refresh — simulate one refresh cycle + svc._nat_portmap_lifetime = 0 + svc.events.started.set() + svc.wait = _fake_wait_after(svc, iterations=2) + + await svc._run() + + all_calls = mock_wan_service.async_call_action.call_args_list + add_calls = [c for c in all_calls if c.args[0] == "AddPortMapping"] + assert len(add_calls) == 4 # 2 from discover + 2 from refresh + + # Phase 3: stop — should delete mappings and close session + await svc.stop() + + all_calls = mock_wan_service.async_call_action.call_args_list + delete_calls = [c for c in all_calls if c.args[0] == "DeletePortMapping"] + assert len(delete_calls) == 2 # TCP + UDP + session.close.assert_awaited_once() + assert svc._session is None