Skip to content

Commit

Permalink
fix not checking ack auth
Browse files Browse the repository at this point in the history
  • Loading branch information
Tjstretchalot committed Jan 26, 2025
1 parent 1f85203 commit 0906e7c
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 33 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ classifiers = [
dependencies = [
"fastapi>=0.115",
"aiohttp>=3.11",
"lonelypsp>=0.0.38"
"lonelypsp>=0.0.40"
]
requires-python = ">=3.9"

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ colorama==0.4.6
fastapi==0.115.5
frozenlist==1.5.0
idna==3.10
lonelypsp==0.0.39
lonelypsp==0.0.40
multidict==6.1.0
mypy==1.13.0
mypy-extensions==1.0.0
Expand Down
4 changes: 2 additions & 2 deletions src/lonelypss/routes/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ async def handle_trusted_notify(

try:
try:
parsed_resp = await _parse_notify_response(resp.content)
parsed_resp = await _parse_receive_response(resp.content)
finally:
await raw_resp.__aexit__(None, None, None)
except Exception:
Expand Down Expand Up @@ -559,7 +559,7 @@ class _ReceiveResponseConfirmReceive:
num_subscribers: int


async def _parse_notify_response(
async def _parse_receive_response(
rdr: AsyncReadableBytesIO,
) -> Union[_ReceiveResponseUnsubscribeImmediate, _ReceiveResponseConfirmReceive]:
"""Raises ValueError typically if the body is bad"""
Expand Down
29 changes: 0 additions & 29 deletions src/lonelypss/ws/handlers/open/check_read_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,6 @@ async def check_read_task(state: StateOpen) -> CheckResult:
message = S2B_AnyMessageParser.parse(prefix.flags, prefix.type, payload_reader)
state.read_task = make_websocket_read_task(state.websocket)

# fast track acks as they are common and can be handled synchronously
if message.type == SubscriberToBroadcasterStatefulMessageType.CONFIRM_RECEIVE:
expected_ack = state.expecting_acks.get_nowait()
if expected_ack.type != message.type:
raise Exception(
f"unexpected confirm receive (expecting a {expected_ack.type})"
)
if expected_ack.identifier != message.identifier:
raise Exception(
f"unexpected confirm receive (expecting identifier {expected_ack.identifier!r}, got {message.identifier!r})"
)
return CheckResult.RESTART

if message.type == SubscriberToBroadcasterStatefulMessageType.CONTINUE_RECEIVE:
expected_ack = state.expecting_acks.get_nowait()
if expected_ack.type != message.type:
raise Exception(
f"unexpected continue receive (expecting a {expected_ack.type})"
)
if expected_ack.identifier != message.identifier:
raise Exception(
f"unexpected continue receive (expecting identifier {expected_ack.identifier!r}, got {message.identifier!r})"
)
if expected_ack.part_id != message.part_id:
raise Exception(
f"unexpected continue receive (expecting part_id {expected_ack.part_id}, got {message.part_id})"
)
return CheckResult.RESTART

if state.process_task is None:
state.process_task = asyncio.create_task(process_any(state, message))
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import time
from typing import TYPE_CHECKING

from lonelypsp.auth.config import AuthResult
from lonelypsp.stateful.messages.confirm_receive import S2B_ConfirmReceive

from lonelypss.ws.handlers.open.processors.protocol import S2B_MessageProcessor
from lonelypss.ws.handlers.open.websocket_url import (
make_for_receive_websocket_url_and_change_counter,
)
from lonelypss.ws.state import StateOpen


async def process_confirm_receive(
state: StateOpen, message: S2B_ConfirmReceive
) -> None:
expected_ack = state.expecting_acks.get_nowait()
if expected_ack.type != message.type:
raise Exception(f"unexpected confirm receive (expecting a {expected_ack.type})")
if expected_ack.identifier != message.identifier:
raise Exception(
f"unexpected confirm receive (expecting identifier {expected_ack.identifier!r}, got {message.identifier!r})"
)

receive_url = make_for_receive_websocket_url_and_change_counter(state)
auth_result = await state.broadcaster_config.is_confirm_receive_allowed(
tracing=message.tracing,
identifier=message.identifier,
num_subscribers=message.num_subscribers,
url=receive_url,
now=time.time(),
authorization=message.authorization,
)
if auth_result != AuthResult.OK:
raise Exception(f"confirm receive auth: {auth_result}")


if TYPE_CHECKING:
_: S2B_MessageProcessor[S2B_ConfirmReceive] = process_confirm_receive
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import time
from typing import TYPE_CHECKING

from lonelypsp.auth.config import AuthResult
from lonelypsp.stateful.messages.continue_receive import S2B_ContinueReceive

from lonelypss.ws.handlers.open.processors.protocol import S2B_MessageProcessor
from lonelypss.ws.handlers.open.websocket_url import (
make_for_receive_websocket_url_and_change_counter,
)
from lonelypss.ws.state import StateOpen


async def process_continue_receive(
state: StateOpen, message: S2B_ContinueReceive
) -> None:
expected_ack = state.expecting_acks.get_nowait()
if expected_ack.type != message.type:
raise Exception(
f"unexpected continue receive (expecting a {expected_ack.type})"
)
if expected_ack.identifier != message.identifier:
raise Exception(
f"unexpected continue receive (expecting identifier {expected_ack.identifier!r}, got {message.identifier!r})"
)
if expected_ack.part_id != message.part_id:
raise Exception(
f"unexpected continue receive (expecting part_id {expected_ack.part_id}, got {message.part_id})"
)

receive_url = make_for_receive_websocket_url_and_change_counter(state)
auth_result = await state.broadcaster_config.is_stateful_continue_receive_allowed(
url=receive_url,
message=message,
now=time.time(),
)
if auth_result != AuthResult.OK:
raise Exception(f"continue receive auth: {auth_result}")


if TYPE_CHECKING:
_: S2B_MessageProcessor[S2B_ContinueReceive] = process_continue_receive
8 changes: 8 additions & 0 deletions src/lonelypss/ws/handlers/open/processors/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
from lonelypsp.stateful.constants import SubscriberToBroadcasterStatefulMessageType
from lonelypsp.stateful.message import S2B_Message

from lonelypss.ws.handlers.open.processors.process_confirm_receive import (
process_confirm_receive,
)
from lonelypss.ws.handlers.open.processors.process_continue_receive import (
process_continue_receive,
)
from lonelypss.ws.handlers.open.processors.process_notify import process_notify
from lonelypss.ws.handlers.open.processors.process_notify_stream import (
process_notify_stream,
Expand All @@ -29,6 +35,8 @@
SubscriberToBroadcasterStatefulMessageType.SUBSCRIBE_GLOB: process_subscribe_glob,
SubscriberToBroadcasterStatefulMessageType.UNSUBSCRIBE_EXACT: process_unsubscribe_exact,
SubscriberToBroadcasterStatefulMessageType.UNSUBSCRIBE_GLOB: process_unsubscribe_glob,
SubscriberToBroadcasterStatefulMessageType.CONFIRM_RECEIVE: process_confirm_receive,
SubscriberToBroadcasterStatefulMessageType.CONTINUE_RECEIVE: process_continue_receive,
}


Expand Down
1 change: 1 addition & 0 deletions src/lonelypss/ws/handlers/open/send_receive_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ async def send_receive_stream_given_first_headers(
identifier=identifier,
authorization=None,
tracing=b"",
num_subscribers=1,
)
if is_done
else S2B_ContinueReceive(
Expand Down

0 comments on commit 0906e7c

Please sign in to comment.