Skip to content

Commit 6c8e155

Browse files
committed
revert changes and update examples
1 parent 0dfc710 commit 6c8e155

File tree

4 files changed

+18
-16
lines changed

4 files changed

+18
-16
lines changed

examples/server_with_routing.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,11 @@ async def metadata_push(payload: Payload, composite_metadata: CompositeMetadata)
7272
if item.encoding == b'text/plain':
7373
storage.last_metadata_push = item.content
7474

75-
@router.channel('channel', send_request_payload_to_subscriber=True)
76-
async def channel_response(payload, composite_metadata):
75+
@router.channel('channel')
76+
async def channel_response(payload:Payload, composite_metadata):
7777
logging.info('Got channel request')
7878
subscriber = LoggingSubscriber()
79+
subscriber.on_next(payload)
7980
channel = sample_async_response_stream(local_subscriber=subscriber)
8081
return channel, subscriber
8182

examples/tutorial/reactivex/chat_server.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ async def receive_statistics(statistics: ClientStatistics):
180180
self._session.statistics = statistics
181181

182182
@router.channel('statistics')
183-
async def send_statistics() -> ReactivexChannel:
183+
async def send_statistics(initial_payload: Payload) -> ReactivexChannel:
184184

185185
async def statistics_generator():
186186
while True:
@@ -201,6 +201,8 @@ def on_next(payload: Payload):
201201
if request.period_seconds is not None:
202202
self._session.requested_statistics.period_seconds = request.period_seconds
203203

204+
on_next(initial_payload)
205+
204206
return ReactivexChannel(
205207
from_observable_with_backpressure(
206208
lambda backpressure: observable_from_async_generator(

rsocket/reactivex/reactivex_handler_adapter.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,13 @@ async def request_fire_and_forget(self, payload: Payload):
4848
await self.delegate.request_fire_and_forget(payload)
4949

5050
async def request_response(self, payload: Payload) -> asyncio.Future:
51-
observable = await self.delegate.request_response(payload)
51+
response = await self.delegate.request_response(payload)
52+
53+
if isinstance(response, asyncio.Future):
54+
observable = await response
55+
else:
56+
observable = response
57+
5258
return observable.pipe(
5359
operators.default_if_empty(Payload()),
5460
operators.to_future()

rsocket/routing/request_router.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,20 @@
1717

1818

1919
class RouteInfo:
20-
def __init__(self, method, send_channel_request_payload_to_subscriber=False):
20+
def __init__(self, method):
2121
self.method: Callable = method
2222
self.signature = signature(method)
23-
self.send_channel_request_payload_to_subscriber = send_channel_request_payload_to_subscriber
2423

2524

26-
def decorator_factory(container: dict, route: str, send_channel_request_payload_to_subscriber: bool = False):
25+
def decorator_factory(container: dict, route: str):
2726
def decorator(function: decorated_method):
2827
if safe_len(route) == 0:
2928
raise RSocketEmptyRoute(function.__name__)
3029

3130
if route in container:
3231
raise KeyError('Duplicate route "%s" already registered', route)
3332

34-
container[route] = RouteInfo(function, send_channel_request_payload_to_subscriber)
33+
container[route] = RouteInfo(function)
3534
return function
3635

3736
return decorator
@@ -107,9 +106,8 @@ def wrapper(function):
107106

108107
return wrapper
109108

110-
def channel(self, route: str, send_request_payload_to_subscriber: bool = False):
111-
return decorator_factory(self._channel_routes, route,
112-
send_channel_request_payload_to_subscriber=send_request_payload_to_subscriber)
109+
def channel(self, route: str):
110+
return decorator_factory(self._channel_routes, route)
113111

114112
def channel_unknown(self):
115113
def wrapper(function):
@@ -164,11 +162,6 @@ async def route(self,
164162

165163
return create_future(result)
166164

167-
elif frame_type == FrameType.REQUEST_CHANNEL:
168-
publisher, subscriber = result
169-
if route_info.send_channel_request_payload_to_subscriber:
170-
subscriber.on_next(payload)
171-
172165
return result
173166

174167
def _collect_route_arguments(self,

0 commit comments

Comments
 (0)