Skip to content

Commit 3612207

Browse files
authored
Merge branch 'master' into master
2 parents f001953 + a52893e commit 3612207

File tree

9 files changed

+98
-8
lines changed

9 files changed

+98
-8
lines changed

CHANGELOG.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
Changelog
22
---------
33

4+
v0.4.12
5+
=======
6+
- Fixed fragmentation for fire and forget
7+
48
v0.4.11
59
=======
610
- Breaking change: RequestRouter argument 'payload_mapper' was replaced with 'payload_deserializer' and 'payload_serializer'

bugs/165_client.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import asyncio
2+
import logging
3+
from datetime import timedelta
4+
5+
from rsocket.helpers import single_transport_provider
6+
from rsocket.payload import Payload
7+
from rsocket.rsocket_client import RSocketClient
8+
from rsocket.transports.tcp import TransportTCP
9+
10+
11+
async def main(server_port):
12+
logging.info("Connecting to server at localhost:%s", server_port)
13+
14+
connection = await asyncio.open_connection("localhost", server_port)
15+
16+
async with RSocketClient(
17+
single_transport_provider(TransportTCP(*connection)),
18+
fragment_size_bytes=10240,
19+
keep_alive_period=timedelta(seconds=10),
20+
) as client:
21+
# huge_array = bytearray(16777209) # Works
22+
huge_array = bytearray(16777210) # rsocket.exceptions.ParseError: Frame too short: 0 bytes
23+
payload = Payload(huge_array)
24+
await client.fire_and_forget(payload)
25+
await asyncio.sleep(1)
26+
27+
28+
if __name__ == "__main__":
29+
logging.basicConfig(level=logging.DEBUG)
30+
asyncio.run(main(10000))

bugs/165_server.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import asyncio
2+
import logging
3+
4+
from rsocket.helpers import create_future
5+
from rsocket.local_typing import Awaitable
6+
from rsocket.payload import Payload
7+
from rsocket.request_handler import BaseRequestHandler
8+
from rsocket.rsocket_server import RSocketServer
9+
from rsocket.transports.tcp import TransportTCP
10+
11+
12+
class Handler(BaseRequestHandler):
13+
async def request_fire_and_forget(self, payload: Payload) -> Awaitable[Payload]:
14+
print(f"Receiving {len(payload.data)} bytes")
15+
return create_future(Payload(b"OK"))
16+
17+
18+
async def run_server(server_port):
19+
logging.info("Starting server at localhost:%s", server_port)
20+
21+
def session(*connection):
22+
RSocketServer(TransportTCP(*connection), handler_factory=Handler, fragment_size_bytes=10240)
23+
24+
server = await asyncio.start_server(session, "localhost", server_port)
25+
26+
async with server:
27+
await server.serve_forever()
28+
29+
30+
if __name__ == "__main__":
31+
logging.basicConfig(filename="rsocket.log", format="%(asctime)s %(message)s", filemode="w")
32+
asyncio.run(run_server(10000))

bugs/__init__.py

Whitespace-only changes.

docs/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
2525

2626
# The full version, including alpha/beta/rc tags
27-
release = '0.4.11'
27+
release = '0.4.12'
2828

2929
# -- General configuration ---------------------------------------------------
3030

requirements.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
Rx==3.2.0
22
aiohttp==3.8.4
3-
aioquic==0.9.20
3+
aioquic==0.9.21
44
asyncclick==8.1.3.4
55
asyncstdlib==3.10.8
66
coverage==6.5.0
77
coveralls==3.3.1
88
decoy==2.0.2
99
flake8==5.0.4
10-
pytest-asyncio==0.21.0
10+
pytest-asyncio==0.21.1
1111
pytest-cov==4.1.0
1212
pytest-profiling==1.7.0
13-
pytest-rerunfailures==11.1.2
13+
pytest-rerunfailures==12.0
1414
pytest-timeout==2.1.0
1515
pytest-xdist==3.3.1
1616
pytest==7.4.0
1717
quart==0.18.4
1818
reactivex==4.0.4
19-
starlette==0.28.0
19+
starlette==0.30.0
2020
cbitstruct==1.0.9
2121
cloudevents==1.9.0
22-
pydantic==1.10.9
22+
pydantic==1.10.11

rsocket/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '0.4.11'
1+
__version__ = '0.4.12'

rsocket/rsocket_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ def fire_and_forget(self, payload: Payload) -> Awaitable[None]:
487487
logger().debug('%s: fire-and-forget: %s', self._log_identifier(), payload)
488488

489489
stream_id = self._allocate_stream()
490-
frame = to_fire_and_forget_frame(stream_id, payload)
490+
frame = to_fire_and_forget_frame(stream_id, payload, self._fragment_size_bytes)
491491
self.send_request(frame)
492492
frame.sent_future.add_done_callback(lambda _: self.finish_stream(stream_id))
493493
return frame.sent_future

tests/rsocket/test_fire_and_forget.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,30 @@ def handler_factory():
3636
await asyncio.sleep(2) # wait for server to close
3737

3838

39+
async def test_request_fire_and_forget_fragmented(lazy_pipe):
40+
handler: Optional[FireAndForgetHandler] = None
41+
42+
def handler_factory():
43+
nonlocal handler
44+
handler = FireAndForgetHandler()
45+
return handler
46+
47+
async with lazy_pipe(
48+
server_arguments={'handler_factory': handler_factory,
49+
'fragment_size_bytes': 10240},
50+
client_arguments={'fragment_size_bytes': 10240}) as (server, client):
51+
data = bytearray(16777210)
52+
53+
await client.fire_and_forget(Payload(data))
54+
55+
await handler.received.wait()
56+
57+
assert handler.received_payload.data == data
58+
assert handler.received_payload.metadata == b''
59+
60+
await asyncio.sleep(2) # wait for server to close
61+
62+
3963
async def test_request_fire_and_forget_awaitable_client(lazy_pipe):
4064
handler: Optional[FireAndForgetHandler] = None
4165

0 commit comments

Comments
 (0)