Skip to content

Commit

Permalink
Fix port exhaustion from http notifies
Browse files Browse the repository at this point in the history
  • Loading branch information
Tjstretchalot committed Jan 14, 2025
1 parent dfff4e4 commit 7c9b42d
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 21 deletions.
72 changes: 70 additions & 2 deletions src/lonelypss/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Union,
)

import aiohttp
from lonelypsp.auth.config import AuthConfig
from lonelypsp.compat import fast_dataclass
from lonelypsp.stateful.messages.configure import S2B_Configure
Expand Down Expand Up @@ -776,8 +777,62 @@ async def train_compression_dict_high_watermark(
return (zdict, 10)


class NotifySessionConfig(Protocol):
async def setup_http_notify_client_session(self, config: "Config") -> None:
"""Called to initialize the aiohttp.ClientSession used when receiving
NOTIFY messages from the http endpoint
"""

@property
def http_notify_client_session(self) -> aiohttp.ClientSession:
"""The aiohttp.ClientSession used when receiving NOTIFY messages from the http endpoint.
Should raise an error if not setup
"""
...

async def teardown_http_notify_client_session(self) -> None:
"""Called to close the aiohttp.ClientSession used when receiving
NOTIFY messages from the http endpoint
"""


class NotifySessionStandard:
"""Standard implementation of NotifySessionConfig"""

def __init__(self) -> None:
self.client_session: Optional[aiohttp.ClientSession] = None

async def setup_http_notify_client_session(self, config: "Config") -> None:
assert self.client_session is None, "already setup"
self.client_session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(
total=config.outgoing_http_timeout_total,
connect=config.outgoing_http_timeout_connect,
sock_read=config.outgoing_http_timeout_sock_read,
sock_connect=config.outgoing_http_timeout_sock_connect,
)
)

@property
def http_notify_client_session(self) -> aiohttp.ClientSession:
assert self.client_session is not None, "not setup"
return self.client_session

async def teardown_http_notify_client_session(self) -> None:
assert self.client_session is not None, "not setup"
sess = self.client_session
self.client_session = None
await sess.close()


class Config(
AuthConfig, DBConfig, GenericConfig, MissedRetryConfig, CompressionConfig, Protocol
AuthConfig,
DBConfig,
GenericConfig,
MissedRetryConfig,
CompressionConfig,
NotifySessionConfig,
Protocol,
):
"""The injected behavior required for the lonelypss to operate. This is
generally generated for you using one of the templates, see the readme for details
Expand All @@ -794,12 +849,14 @@ def __init__(
generic: GenericConfig,
missed: MissedRetryConfig,
compression: CompressionConfig,
notify_session: NotifySessionConfig,
):
self.auth = auth
self.db = db
self.generic = generic
self.missed = missed
self.compression = compression
self.notify_session_config = notify_session

async def setup_to_broadcaster_auth(self) -> None:
await self.auth.setup_to_broadcaster_auth()
Expand Down Expand Up @@ -1160,9 +1217,20 @@ def websocket_minimal_headers(self) -> bool:
def sweep_missed_interval(self) -> float:
return self.generic.sweep_missed_interval

async def setup_http_notify_client_session(self, config: "Config") -> None:
await self.notify_session_config.setup_http_notify_client_session(config)

@property
def http_notify_client_session(self) -> aiohttp.ClientSession:
return self.notify_session_config.http_notify_client_session

async def teardown_http_notify_client_session(self) -> None:
await self.notify_session_config.teardown_http_notify_client_session()


if TYPE_CHECKING:
_a: Type[GenericConfig] = GenericConfigFromValues
_b: Type[CompressionConfig] = CompressionConfigFromParts
_c: Type[MissedRetryConfig] = MissedRetryStandard
_d: Type[Config] = ConfigFromParts
_d: Type[NotifySessionConfig] = NotifySessionStandard
_e: Type[Config] = ConfigFromParts
14 changes: 11 additions & 3 deletions src/lonelypss/config/lifespan.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ async def setup_config(config: Config) -> None:
await config.setup_to_subscriber_auth()
try:
await config.setup_db()
try:
await config.setup_http_notify_client_session(config)
except BaseException:
await config.teardown_db()
raise
except BaseException:
await config.teardown_to_subscriber_auth()
raise
Expand All @@ -19,9 +24,12 @@ async def setup_config(config: Config) -> None:
async def teardown_config(config: Config) -> None:
"""Convenience function to teardown the configuration (similiar idea to aenter)"""
try:
await config.teardown_db()
await config.teardown_http_notify_client_session()
finally:
try:
await config.teardown_to_subscriber_auth()
await config.teardown_db()
finally:
await config.teardown_to_broadcaster_auth()
try:
await config.teardown_to_subscriber_auth()
finally:
await config.teardown_to_broadcaster_auth()
2 changes: 2 additions & 0 deletions src/lonelypss/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def setup_locally(
ConfigFromParts,
GenericConfigFromValues,
MissedRetryStandard,
NotifySessionStandard,
)
from lonelypss.config.lifespan import setup_config, teardown_config
from lonelypss.middleware.config import ConfigMiddleware
Expand Down Expand Up @@ -278,6 +279,7 @@ def _make_config() -> Config:{load_auth_secrets}
compression_retrain_interval_seconds=60 * 60 * 60,
decompression_max_window_size=8 * 1024 * 1024,
),
notify_session=NotifySessionStandard(),
)
Expand Down
24 changes: 8 additions & 16 deletions src/lonelypss/routes/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,22 +141,14 @@ async def notify(
return Response(status_code=400)

request_body.seek(2 + topic_length + 64 + 8)
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(
total=config.outgoing_http_timeout_total,
connect=config.outgoing_http_timeout_connect,
sock_read=config.outgoing_http_timeout_sock_read,
sock_connect=config.outgoing_http_timeout_sock_connect,
)
) as session:
notify_result = await handle_trusted_notify(
topic,
request_body,
config=config,
session=session,
content_length=message_length,
sha512=actual_hash,
)
notify_result = await handle_trusted_notify(
topic,
request_body,
config=config,
session=config.http_notify_client_session,
content_length=message_length,
sha512=actual_hash,
)

if notify_result.type == TrustedNotifyResultType.UNAVAILABLE:
return Response(status_code=503)
Expand Down

0 comments on commit 7c9b42d

Please sign in to comment.