Skip to content

Commit

Permalink
Merge pull request #9 from noisyboiler/0.7.4
Browse files Browse the repository at this point in the history
topic subscription features
  • Loading branch information
noisyboiler authored Jan 26, 2017
2 parents 958c56b + 860aa22 commit eda7391
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 44 deletions.
9 changes: 7 additions & 2 deletions test/test_publishing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from mock import ANY

from wampy.peers.clients import Client
from wampy.roles.subscriber import subscribe
Expand Down Expand Up @@ -111,8 +112,12 @@ def foo_topic_handler(self, **kwargs):

def check_kwargs():
assert SubscribingClient.received_kwargs == {
"message": "foobar",
"spam": "eggs"
'message': 'foobar',
'spam': 'eggs',
'_meta': {
'topic': 'foo',
'subscription_id': ANY,
},
}

assert_stops_raising(check_kwargs)
File renamed without changes.
11 changes: 7 additions & 4 deletions test/test_roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@ def publisher(self, router):
with Client() as client:
yield client

def test_subscribe_to_topic(self, router, publisher):
def test_subscribe_to_topics(self, router, publisher):
message_handler = Mock()

subscriber = TopicSubscriber(
router=router, realm=DEFAULT_REALM, topic="foo",
router=router, realm=DEFAULT_REALM, topics=["foo", "spam"],
message_handler=message_handler)

def wait_for_message():
assert message_handler.called is True
assert message_handler.call_args == call(u'bar')
assert message_handler.call_count == 2
assert message_handler.call_args_list == [
call(u'bar'), call(u'ham')
]

with subscriber:
publisher.publish(topic="foo", message="bar")
publisher.publish(topic="spam", message="ham")
assert_stops_raising(wait_for_message)
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,13 @@ def spam_handler(self, **kwargs):
spam_subscriber = SpamClientClient()

with foo_subscriber:
foo_subscription_id, topic = foo_subscriber.subscription_map[
'foo_handler']
for foo_subscription_id, details in foo_subscriber.subscription_map.items():
handler_name, topic = details
if handler_name == "foo_handler":
break

_, topic = foo_subscriber.subscription_map[
foo_subscription_id]

def check_list():
subscription_list = foo_subscriber.get_subscription_list()
Expand All @@ -149,11 +154,17 @@ def check_list():
assert_stops_raising(check_list)

with another_foo_subscriber:
another_subscription_id, topic = (
another_foo_subscriber.subscription_map['foo_handler']
for another_subscription_id, details in foo_subscriber.subscription_map.items():
handler_name, topic = details
if handler_name == "foo_handler":
break

handler_name, topic = (
another_foo_subscriber.subscription_map[another_subscription_id]
)

assert another_subscription_id == foo_subscription_id
assert topic == "foo"

def check_list():
subscription_list = (
Expand All @@ -162,15 +173,15 @@ def check_list():
assert len(subscription_list['exact']) == 1
assert another_subscription_id in subscription_list[
'exact']
assert topic == "foo"

assert_stops_raising(check_list)

with spam_subscriber:
# now there are 3 clients and 2 subscriptions

spam_subscription_id, _ = spam_subscriber.subscription_map[
'spam_handler']
for spam_subscription_id, details in spam_subscriber.subscription_map.items():
handler_name, topic = details
if handler_name == "spam_handler":
break

def check_list():
subscription_list = (
Expand Down Expand Up @@ -243,14 +254,20 @@ def foo_handler(self, **kwargs):
assert len(foo_subscriber.subscription_map) == 0

with foo_subscriber:
for foo_subscription_id, details in foo_subscriber.subscription_map.items():
handler_name, topic = details
if handler_name == "foo_handler":
break

_, topic = foo_subscriber.subscription_map[foo_subscription_id]
assert topic == "foo"

subscription_id = foo_subscriber.get_subscription_lookup(
topic="foo")
assert subscription_id is not None
assert foo_subscription_id == subscription_id
assert len(foo_subscriber.subscription_map) == 1

foo_subscription_id, topic = foo_subscriber.subscription_map[
'foo_handler']
assert topic == "foo"
assert subscription_id == foo_subscription_id
assert len(foo_subscriber.subscription_map) == 0

def test_subscription_lookup_topic_not_found(self, router):
class FooClient(Client):
Expand All @@ -268,17 +285,18 @@ def foo_handler(self, **kwargs):
pass

foo_subscriber = FooClient()
another_foo_subscriber = FooClient()

just_a_client = Client()

with foo_subscriber:
with another_foo_subscriber:
with just_a_client:
subscription_id, _ = foo_subscriber.subscription_map[
'foo_handler']
info = just_a_client.get_subscription_info(
subscription_id=subscription_id)
with just_a_client:
for subscription_id, details in foo_subscriber.subscription_map.items():
handler_name, topic = details
if handler_name == "foo_handler" and topic == "foo":
break

info = just_a_client.get_subscription_info(
subscription_id=subscription_id
)

expected_info = {
'created': ANY,
Expand Down
6 changes: 6 additions & 0 deletions wampy/peers/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ def rpc(self):
def publish(self):
return PublishProxy(client=self)

def get_subscription_handler_names(self):
handler_names = []
for handler, topic in self.subscription_map.values():
handler_names.append(handler)
return handler_names

def get_subscription_info(self, subscription_id):
""" Retrieves information on a particular subscription. """
return self.call("wamp.subscription.get", subscription_id)
Expand Down
23 changes: 11 additions & 12 deletions wampy/roles/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def subscribe_to_topic(session, topic, handler):
topic, wamp_code)
)

session.subscription_map[procedure_name] = subscription_id, topic
session.subscription_map[subscription_id] = procedure_name, topic

logger.info(
'registered handler "%s" for topic "%s"', procedure_name, topic
Expand Down Expand Up @@ -62,7 +62,7 @@ class TopicSubscriber(object):
""" Stand alone websocket topic subscriber """

def __init__(
self, router, realm, topic, message_handler,
self, router, realm, topics, message_handler,
roles=None, transport="websocket",
):
""" Subscribe to a single topic.
Expand All @@ -74,15 +74,15 @@ def __init__(
router: instance
subclass of :cls:`wampy.peers.routers.Router`
realm : string
topic : string
topics : list
message_handler : func
roles: dictionary
"""
self.id = str(uuid4())
self.router = router
self.realm = realm
self.topic = topic
self.topics = topics
self.message_handler = message_handler
self.roles = roles or {
'roles': {
Expand All @@ -107,19 +107,18 @@ def __exit__(self, exception_type, exception_value, traceback):

def start(self):
self.session.begin()
subscribe_to_topic(
session=self.session, topic=self.topic, handler=self.topic_handler
)
for topic in self.topics:
subscribe_to_topic(
session=self.session, topic=topic, handler=self.topic_handler
)

self.subscribed = True
logger.info("subscribed to %s", self.topic)
logger.info("subscribed to %s", ", ".join(self.topics))

def stop(self):
self.session.end()
self.subscribed = False

def topic_handler(self, *args, **kwargs):
logger.info(
"handling message from %s topic: (%s, %s)",
self.topic, args, kwargs
)
logger.info("handling message: (%s, %s)", args, kwargs)
self.message_handler(kwargs['message'])
10 changes: 5 additions & 5 deletions wampy/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,18 +223,18 @@ def handle_message(self, message):
# ]
_, subscription_id, _, details = message

id_to_func_name_map = {
v[0]: k for k, v in self.subscription_map.items()
}

func_name = id_to_func_name_map[subscription_id]
func_name, topic = self.subscription_map[subscription_id]
try:
func = getattr(self.client, func_name)
except AttributeError:
raise WampError(
"Event handler not found: {}".format(func_name)
)

payload_dict['_meta'] = {}
payload_dict['_meta']['topic'] = topic
payload_dict['_meta']['subscription_id'] = subscription_id

func(*payload_list, **payload_dict)

else:
Expand Down

0 comments on commit eda7391

Please sign in to comment.