Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 148 additions & 123 deletions quarkchain/p2p/nat.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,21 @@

import aiohttp
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR introduces a new runtime dependency on async-upnp-client (and directly imports aiohttp in nat.py), but the project dependencies still only list upnpclient and netifaces.

Right now requirements.txt / setup.py are not updated to install the new dependency set, so a clean environment will fail at import time before this code can run.

It looks like we should add async-upnp-client (and ensure aiohttp is available through project dependencies), while removing the old UPnP-specific dependencies that are no longer used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be resolved after merge other changes. Test can be run on branch upgrade-py3-13.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think this is a good practice, as additional bugs may be introduced during the merge process.

Also, have you run the full test suite again on upgrade-py3-13?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All changes will be made on the upgrade-py3-13 branch. After all tests pass, they will be cherry-picked to the target branch. Once the merge is completed, the full test suite will be executed again. I will also perform a comparison between the upgrade/base-line branch and the upgrade-py3-13 branch to ensure consistency and correctness.

import asyncio
from concurrent.futures import ThreadPoolExecutor
import ipaddress
from typing import List, NamedTuple
from urllib.parse import urlparse
import socket
from contextlib import suppress
from typing import Optional

from quarkchain.p2p.cancel_token.token import CancelToken, OperationCancelled
from quarkchain.p2p.exceptions import NoInternalAddressMatchesDevice
from quarkchain.p2p.service import BaseService
import netifaces
import upnpclient

from async_upnp_client.aiohttp import AiohttpSessionRequester
from async_upnp_client.client_factory import UpnpFactory
from async_upnp_client.search import async_search

# UPnP discovery can take a long time, so use a loooong timeout here.
UPNP_DISCOVER_TIMEOUT_SECONDS = 30


PortMapping = NamedTuple(
"PortMapping",
[
("internal", str), # of the form "192.2.3.4:56"
("external", str), # of the form "192.2.3.4:56"
],
)


def find_internal_ip_on_device_network(upnp_dev: upnpclient.upnp.Device) -> str:
"""
For a given UPnP device, return the internal IP address of this host machine that can
be used for a NAT mapping.
"""
parsed_url = urlparse(upnp_dev.location)
# Get an ipaddress.IPv4Network instance for the upnp device's network.
upnp_dev_net = ipaddress.ip_network(parsed_url.hostname + "/24", strict=False)
for iface in netifaces.interfaces():
for family, addresses in netifaces.ifaddresses(iface).items():
# TODO: Support IPv6 addresses as well.
if family != netifaces.AF_INET:
continue
for item in addresses:
if ipaddress.ip_address(item["addr"]) in upnp_dev_net:
return item["addr"]
raise NoInternalAddressMatchesDevice(device_hostname=parsed_url.hostname)


class UPnPService(BaseService):
"""
Generate a mapping of external network IP address/port to internal IP address/port,
Expand All @@ -58,9 +31,46 @@ def __init__(self, port: int, token: CancelToken = None) -> None:
"""
super().__init__(token)
self.port = port
self._mapping = (
None
) # : PortMapping when called externally, this never returns None
self._session = None
self._service = None


# -----------------------------
# Public API
# -----------------------------

async def discover(self) -> Optional[str]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the error-handling behavior here is still riskier than the previous implementation.

Before this refactor, NAT setup was effectively best-effort: add_nat_portmap() logged common UPnP failures and returned None, so the server could still start even if port mapping failed. In the new flow, BaseServer._run() directly awaits self.upnp_service.discover(), and discover() only handles the “no WANIP service found” case gracefully.

If _discover() raises, or if _add_port_mapping() fails after a service has already been found, that exception now bubbles out of discover() and can abort server startup. In that failure path, the session also does not appear to be closed before the exception escapes.

It would be safer to preserve the previous best-effort behavior here: log the NAT setup failure, clean up the session, and return None so startup can continue without UPnP.

"""
Discover router and create initial port mapping.
Returns external IP if successful.
"""
self._session = aiohttp.ClientSession()
await self._discover(self._session)

if not self._service:
self.logger.warning("No UPnP WANIP service found")
await self._close_session()
return None

await self._add_port_mapping()

return await self._get_external_ip()


async def stop(self):
await self._delete_port_mapping()
await self._close_session()

async def _close_session(self):
if self._session:
await self._session.close()
self._session = None



# -----------------------------
# Internal logic
# -----------------------------

async def _run(self) -> None:
"""Run an infinite loop refreshing our NAT port mapping.
Expand All @@ -72,112 +82,127 @@ async def _run(self) -> None:
try:
# Wait for the port mapping lifetime, and then try registering it again
await self.wait(asyncio.sleep(self._nat_portmap_lifetime))
await self.add_nat_portmap()
if self._service:
await self._add_port_mapping()
except OperationCancelled:
break
except Exception:
self.logger.exception("Failed to setup NAT portmap")

async def add_nat_portmap(self) -> str:
"""
Set up the port mapping

:return: the IP address of the new mapping (or None if failed)
"""
self.logger.info("Setting up NAT portmap...")
try:
devices = await self._discover_upnp_devices()
if devices:
self.logger.info(
"Adding NAT port map on {} devices...".format(len(devices))
)
for upnp_dev in devices:
try:
external_ip = self._add_nat_portmap(upnp_dev)
except NoInternalAddressMatchesDevice as exc:
self.logger.info(
"No internal addresses were managed by the UPnP device at %s",
exc.device_hostname,
)
continue
else:
return external_ip
except upnpclient.soap.SOAPError as e:
if e.args == (718, "ConflictInMappingEntry"):
# An entry already exists with the parameters we specified. Maybe the router
# didn't clean it up after it expired or it has been configured by other piece
# of software, either way we should not override it.
# https://tools.ietf.org/id/draft-ietf-pcp-upnp-igd-interworking-07.html#errors
self.logger.info(
"NAT port mapping already configured, not overriding it"
)
else:
self.logger.exception("Failed to setup NAT portmap")
async def _discover(self, session):
requester = AiohttpSessionRequester(session)
factory = UpnpFactory(requester)

self.logger.warning("No NAT mapping has been set")
self._mapping = None
return None
async def on_response(response):
try:
device = await factory.async_create_device(response.location)

def current_mapping(self) -> PortMapping:
if self._mapping is None:
unbound = ":%d" % self.port
return PortMapping(unbound, unbound)
else:
return self._mapping
for service in device.services.values():
if "WANIPConn" in service.service_type:
self._service = service
self.logger.info("Found UPnP WANIP service")
return
except Exception as e:
self.logger.debug(f"Ignoring device: {e}")

await async_search(on_response, timeout=UPNP_DISCOVER_TIMEOUT_SECONDS)

def _add_nat_portmap(self, upnp_dev: upnpclient.upnp.Device) -> str:
# Detect our internal IP address (which raises if there are no matches)
internal_ip = find_internal_ip_on_device_network(upnp_dev)

external_ip = upnp_dev.WANIPConn1.GetExternalIPAddress()["NewExternalIPAddress"]
async def _add_port_mapping(self):
internal_ip = self._get_internal_ip()

self.logger.info(
f"Adding port mapping {self.port}->{internal_ip}:{self.port}"
)

for protocol, description in [
("TCP", "ethereum p2p"),
("UDP", "ethereum discovery"),
]:
upnp_dev.WANIPConn1.AddPortMapping(
NewRemoteHost=external_ip,
await self._service.async_call_action(
"AddPortMapping",
NewRemoteHost="", # should we use _get_external_ip() to replace this?
NewExternalPort=self.port,
NewProtocol=protocol,
NewInternalPort=self.port,
NewInternalClient=internal_ip,
NewEnabled="1",
NewEnabled=1,
NewPortMappingDescription=description,
NewLeaseDuration=self._nat_portmap_lifetime,
)
self._mapping = PortMapping(
"%s:%d" % (internal_ip, self.port), "%s:%d" % (external_ip, self.port)
)
self.logger.info("NAT port forwarding successfully set up: %r", self._mapping)
return external_ip

async def _discover_upnp_devices(self) -> List[upnpclient.upnp.Device]:
loop = asyncio.get_event_loop()
# Use loop.run_in_executor() because upnpclient.discover() is blocking and may take a
# while to complete. We must use a ThreadPoolExecutor() because the
# response from upnpclient.discover() can't be pickled.
try:
with ThreadPoolExecutor(max_workers=1) as executor:
devices = await self.wait(
loop.run_in_executor(executor, upnpclient.discover),
timeout=UPNP_DISCOVER_TIMEOUT_SECONDS,
)
except TimeoutError:
self.logger.info("Timeout waiting for UPNP-enabled devices")
return
else:
self.logger.debug("Found %d candidate NAT devices", len(devices))

# If there are no UPNP devices we can exit early
if not devices:
self.logger.info("No UPNP-enabled devices found")
async def _delete_port_mapping(self):
if not self._service:
return

# Now we loop over all of the devices until we find one that we can use.
retv = []
for device in devices:
try:
device.WANIPConn1
except AttributeError:
continue
retv.append(device)
return retv
for protocol in ["TCP", "UDP"]:
with suppress(Exception):
await self._service.async_call_action(
"DeletePortMapping",
NewRemoteHost="",
NewExternalPort=self.port,
NewProtocol=protocol,
)
self.logger.info("Deleted UPnP port mapping")


async def _get_external_ip(self) -> Optional[str]:
if not self._service:
return None

try:
result = await self._service.async_call_action("GetExternalIPAddress")
return result.get("NewExternalIPAddress")
except Exception as e:
self.logger.warning(f"Failed to get external IP: {e}")
return None


def _get_internal_ip(self) -> str:
"""
Robust internal IP detection using socket trick.
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
finally:
s.close()


if __name__ == "__main__":
import logging
import argparse

from quarkchain.utils import Logger
Logger.set_logging_level("info")

parser = argparse.ArgumentParser(description="Test UPnP NAT port mapping")
parser.add_argument("--port", type=int, default=38291, help="Port to map (default: 38291)")
args = parser.parse_args()

async def main():
svc = UPnPService(port=args.port)

# Test _get_internal_ip
internal_ip = svc._get_internal_ip()
print(f"Internal IP: {internal_ip}")

# Test _get_external_ip (without UPnP, falls back to None)
external_ip_before = await svc._get_external_ip()
print(f"External IP (before discover): {external_ip_before}")

# Test UPnP discover + port mapping
print(f"\nDiscovering UPnP devices (timeout {UPNP_DISCOVER_TIMEOUT_SECONDS}s)...")
external_ip = await svc.discover()
if external_ip:
print(f"External IP: {external_ip}")
print(f"Port {args.port} mapped successfully")
input("Press Enter to remove mapping and exit...")
await svc.stop()
print("Mapping removed")
else:
print("UPnP discovery failed - no suitable device found")

asyncio.run(main())
2 changes: 1 addition & 1 deletion quarkchain/p2p/p2p_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async def _run(self) -> None:
self.logger.info("Running server...")
mapped_external_ip = None
if self.upnp_service:
mapped_external_ip = await self.upnp_service.add_nat_portmap()
mapped_external_ip = await self.upnp_service.discover()
external_ip = mapped_external_ip or "0.0.0.0"
await self._start_tcp_listener()
self.logger.info(
Expand Down
Loading
Loading