From 7c9b42dbe6d3435c11a809fcdebe4aeaeb3f9273 Mon Sep 17 00:00:00 2001 From: Timothy Moore Date: Tue, 14 Jan 2025 10:25:03 -0800 Subject: [PATCH] Fix port exhaustion from http notifies --- src/lonelypss/config/config.py | 72 +++++++++++++++++++++++++++++++- src/lonelypss/config/lifespan.py | 14 +++++-- src/lonelypss/main.py | 2 + src/lonelypss/routes/notify.py | 24 ++++------- 4 files changed, 91 insertions(+), 21 deletions(-) diff --git a/src/lonelypss/config/config.py b/src/lonelypss/config/config.py index c6f248c..bed008e 100644 --- a/src/lonelypss/config/config.py +++ b/src/lonelypss/config/config.py @@ -15,6 +15,7 @@ Union, ) +import aiohttp from lonelypsp.auth.config import AuthConfig from lonelypsp.compat import fast_dataclass from lonelypsp.stateful.messages.configure import S2B_Configure @@ -776,8 +777,62 @@ async def train_compression_dict_high_watermark( return (zdict, 10) +class NotifySessionConfig(Protocol): + async def setup_http_notify_client_session(self, config: "Config") -> None: + """Called to initialize the aiohttp.ClientSession used when receiving + NOTIFY messages from the http endpoint + """ + + @property + def http_notify_client_session(self) -> aiohttp.ClientSession: + """The aiohttp.ClientSession used when receiving NOTIFY messages from the http endpoint. + Should raise an error if not setup + """ + ... + + async def teardown_http_notify_client_session(self) -> None: + """Called to close the aiohttp.ClientSession used when receiving + NOTIFY messages from the http endpoint + """ + + +class NotifySessionStandard: + """Standard implementation of NotifySessionConfig""" + + def __init__(self) -> None: + self.client_session: Optional[aiohttp.ClientSession] = None + + async def setup_http_notify_client_session(self, config: "Config") -> None: + assert self.client_session is None, "already setup" + self.client_session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout( + total=config.outgoing_http_timeout_total, + connect=config.outgoing_http_timeout_connect, + sock_read=config.outgoing_http_timeout_sock_read, + sock_connect=config.outgoing_http_timeout_sock_connect, + ) + ) + + @property + def http_notify_client_session(self) -> aiohttp.ClientSession: + assert self.client_session is not None, "not setup" + return self.client_session + + async def teardown_http_notify_client_session(self) -> None: + assert self.client_session is not None, "not setup" + sess = self.client_session + self.client_session = None + await sess.close() + + class Config( - AuthConfig, DBConfig, GenericConfig, MissedRetryConfig, CompressionConfig, Protocol + AuthConfig, + DBConfig, + GenericConfig, + MissedRetryConfig, + CompressionConfig, + NotifySessionConfig, + Protocol, ): """The injected behavior required for the lonelypss to operate. This is generally generated for you using one of the templates, see the readme for details @@ -794,12 +849,14 @@ def __init__( generic: GenericConfig, missed: MissedRetryConfig, compression: CompressionConfig, + notify_session: NotifySessionConfig, ): self.auth = auth self.db = db self.generic = generic self.missed = missed self.compression = compression + self.notify_session_config = notify_session async def setup_to_broadcaster_auth(self) -> None: await self.auth.setup_to_broadcaster_auth() @@ -1160,9 +1217,20 @@ def websocket_minimal_headers(self) -> bool: def sweep_missed_interval(self) -> float: return self.generic.sweep_missed_interval + async def setup_http_notify_client_session(self, config: "Config") -> None: + await self.notify_session_config.setup_http_notify_client_session(config) + + @property + def http_notify_client_session(self) -> aiohttp.ClientSession: + return self.notify_session_config.http_notify_client_session + + async def teardown_http_notify_client_session(self) -> None: + await self.notify_session_config.teardown_http_notify_client_session() + if TYPE_CHECKING: _a: Type[GenericConfig] = GenericConfigFromValues _b: Type[CompressionConfig] = CompressionConfigFromParts _c: Type[MissedRetryConfig] = MissedRetryStandard - _d: Type[Config] = ConfigFromParts + _d: Type[NotifySessionConfig] = NotifySessionStandard + _e: Type[Config] = ConfigFromParts diff --git a/src/lonelypss/config/lifespan.py b/src/lonelypss/config/lifespan.py index a58f6df..cc3c40e 100644 --- a/src/lonelypss/config/lifespan.py +++ b/src/lonelypss/config/lifespan.py @@ -8,6 +8,11 @@ async def setup_config(config: Config) -> None: await config.setup_to_subscriber_auth() try: await config.setup_db() + try: + await config.setup_http_notify_client_session(config) + except BaseException: + await config.teardown_db() + raise except BaseException: await config.teardown_to_subscriber_auth() raise @@ -19,9 +24,12 @@ async def setup_config(config: Config) -> None: async def teardown_config(config: Config) -> None: """Convenience function to teardown the configuration (similiar idea to aenter)""" try: - await config.teardown_db() + await config.teardown_http_notify_client_session() finally: try: - await config.teardown_to_subscriber_auth() + await config.teardown_db() finally: - await config.teardown_to_broadcaster_auth() + try: + await config.teardown_to_subscriber_auth() + finally: + await config.teardown_to_broadcaster_auth() diff --git a/src/lonelypss/main.py b/src/lonelypss/main.py index 8295e11..ab18e29 100644 --- a/src/lonelypss/main.py +++ b/src/lonelypss/main.py @@ -226,6 +226,7 @@ def setup_locally( ConfigFromParts, GenericConfigFromValues, MissedRetryStandard, + NotifySessionStandard, ) from lonelypss.config.lifespan import setup_config, teardown_config from lonelypss.middleware.config import ConfigMiddleware @@ -278,6 +279,7 @@ def _make_config() -> Config:{load_auth_secrets} compression_retrain_interval_seconds=60 * 60 * 60, decompression_max_window_size=8 * 1024 * 1024, ), + notify_session=NotifySessionStandard(), ) diff --git a/src/lonelypss/routes/notify.py b/src/lonelypss/routes/notify.py index 00dac56..33cba73 100644 --- a/src/lonelypss/routes/notify.py +++ b/src/lonelypss/routes/notify.py @@ -141,22 +141,14 @@ async def notify( return Response(status_code=400) request_body.seek(2 + topic_length + 64 + 8) - async with aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout( - total=config.outgoing_http_timeout_total, - connect=config.outgoing_http_timeout_connect, - sock_read=config.outgoing_http_timeout_sock_read, - sock_connect=config.outgoing_http_timeout_sock_connect, - ) - ) as session: - notify_result = await handle_trusted_notify( - topic, - request_body, - config=config, - session=session, - content_length=message_length, - sha512=actual_hash, - ) + notify_result = await handle_trusted_notify( + topic, + request_body, + config=config, + session=config.http_notify_client_session, + content_length=message_length, + sha512=actual_hash, + ) if notify_result.type == TrustedNotifyResultType.UNAVAILABLE: return Response(status_code=503)