Skip to content

Commit

Permalink
more types!
Browse files Browse the repository at this point in the history
add annotations on more sugar methods, auth, devices

as well as proper types on most async wrappers
  • Loading branch information
minrk committed Jan 11, 2022
1 parent ab72e55 commit 2dc67d5
Show file tree
Hide file tree
Showing 60 changed files with 1,019 additions and 436 deletions.
6 changes: 3 additions & 3 deletions examples/asyncio/coroutines.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
ctx = Context.instance()


async def ping():
async def ping() -> None:
"""print dots to indicate idleness"""
while True:
await asyncio.sleep(0.5)
print('.')


async def receiver():
async def receiver() -> None:
"""receive messages with polling"""
pull = ctx.socket(zmq.PULL)
pull.connect(url)
Expand All @@ -34,7 +34,7 @@ async def receiver():
print('recvd', msg)


async def sender():
async def sender() -> None:
"""send a message every second"""
tic = time.time()
push = ctx.socket(zmq.PUSH)
Expand Down
34 changes: 22 additions & 12 deletions examples/asyncio/helloworld_pubsub_dealerrouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@
import logging
import traceback

import zmq
import zmq.asyncio
from zmq.asyncio import Context


# set message based on language
class HelloWorld:
def __init__(self):
def __init__(self) -> None:
self.lang = 'eng'
self.msg = "Hello World"

def change_language(self):
def change_language(self) -> None:
if self.lang == 'eng':
self.lang = 'jap'
self.msg = "Hello Sekai"
Expand All @@ -31,21 +32,21 @@ def change_language(self):
self.lang = 'eng'
self.msg = "Hello World"

def msg_pub(self):
def msg_pub(self) -> str:
return self.msg


# receives "Hello World" from topic 'world'
# changes "World" to "Sekai" and returns message 'sekai'
class HelloWorldPrinter:
# process received message
def msg_sub(self, msg):
def msg_sub(self, msg: str) -> None:
print(f"message received world: {msg}")


# manages message flow between publishers and subscribers
class HelloWorldMessage:
def __init__(self, url='127.0.0.1', port='5555'):
def __init__(self, url: str = '127.0.0.1', port: int = 5555):
# get ZeroMQ version
print("Current libzmq version is %s" % zmq.zmq_version())
print("Current pyzmq version is %s" % zmq.__version__)
Expand All @@ -57,6 +58,8 @@ def __init__(self, url='127.0.0.1', port='5555'):
# init hello world publisher obj
self.hello_world = HelloWorld()

def main(self) -> None:

# activate publishers / subscribers
asyncio.get_event_loop().run_until_complete(
asyncio.wait(
Expand All @@ -70,7 +73,7 @@ def __init__(self, url='127.0.0.1', port='5555'):
)

# generates message "Hello World" and publish to topic 'world'
async def hello_world_pub(self):
async def hello_world_pub(self) -> None:
pub = self.ctx.socket(zmq.PUB)
pub.connect(self.url)

Expand Down Expand Up @@ -106,7 +109,7 @@ async def hello_world_pub(self):
pass

# processes message topic 'world'; "Hello World" or "Hello Sekai"
async def hello_world_sub(self):
async def hello_world_sub(self) -> None:
print("Setting up world sub")
obj = HelloWorldPrinter()
# setup subscriber
Expand All @@ -120,7 +123,7 @@ async def hello_world_sub(self):
# keep listening to all published message on topic 'world'
while True:
[topic, msg] = await sub.recv_multipart()
print(f"world sub; topic: {topic}\tmessage: {msg}")
print(f"world sub; topic: {topic.decode()}\tmessage: {msg.decode()}")
# process message
obj.msg_sub(msg.decode('utf-8'))

Expand All @@ -141,7 +144,7 @@ async def hello_world_sub(self):
pass

# Deal a message to topic 'lang' that language should be changed
async def lang_changer_dealer(self):
async def lang_changer_dealer(self) -> None:
# setup dealer
deal = self.ctx.socket(zmq.DEALER)
deal.setsockopt(zmq.IDENTITY, b'lang_dealer')
Expand Down Expand Up @@ -176,7 +179,7 @@ async def lang_changer_dealer(self):
pass

# changes Hello xxx message when a command is received from topic 'lang'; keeps listening for commands
async def lang_changer_router(self):
async def lang_changer_router(self) -> None:
# setup router
rout = self.ctx.socket(zmq.ROUTER)
rout.bind(self.url[:-1] + f"{int(self.url[-1]) + 1}")
Expand All @@ -188,7 +191,9 @@ async def lang_changer_router(self):
# keep listening to all published message on topic 'world'
while True:
[id_dealer, msg] = await rout.recv_multipart()
print(f"Command rout; Sender ID: {id_dealer};\tmessage: {msg}")
print(
f"Command rout; Sender ID: {id_dealer!r};\tmessage: {msg.decode()}"
)

self.hello_world.change_language()
print(
Expand All @@ -208,5 +213,10 @@ async def lang_changer_router(self):
pass


def main() -> None:
hello_world = HelloWorldMessage()
hello_world.main()


if __name__ == '__main__':
HelloWorldMessage()
main()
20 changes: 6 additions & 14 deletions examples/asyncio/tornado_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,32 @@
import asyncio

from tornado.ioloop import IOLoop
from tornado.platform.asyncio import AsyncIOMainLoop

import zmq.asyncio

# Tell tornado to use asyncio
AsyncIOMainLoop().install()

# This must be instantiated after the installing the IOLoop
queue = asyncio.Queue() # type: ignore
ctx = zmq.asyncio.Context()


async def pushing():
server = ctx.socket(zmq.PUSH)
async def pushing() -> None:
server = zmq.asyncio.Context.instance().socket(zmq.PUSH)
server.bind('tcp://*:9000')
while True:
await server.send(b"Hello")
await asyncio.sleep(1)


async def pulling():
client = ctx.socket(zmq.PULL)
async def pulling() -> None:
client = zmq.asyncio.Context.instance().socket(zmq.PULL)
client.connect('tcp://127.0.0.1:9000')
while True:
greeting = await client.recv()
print(greeting)


def zmq_tornado_loop():
def main() -> None:
loop = IOLoop.current()
loop.spawn_callback(pushing)
loop.spawn_callback(pulling)
loop.start()


if __name__ == '__main__':
zmq_tornado_loop()
main()
3 changes: 2 additions & 1 deletion examples/chat/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
#
# You should have received a copy of the Lesser GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from typing import List

import zmq


def main(addrs):
def main(addrs: List[str]):

context = zmq.Context()
socket = context.socket(zmq.SUB)
Expand Down
2 changes: 1 addition & 1 deletion examples/chat/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import zmq


def main(addr, who):
def main(addr: str, who: str):

ctx = zmq.Context()
socket = ctx.socket(zmq.PUB)
Expand Down
4 changes: 2 additions & 2 deletions examples/cython/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import zmq


def python_sender(url, n):
def python_sender(url: str, n: int) -> None:
"""Use entirely high-level Python APIs to send messages"""
ctx = zmq.Context()
s = ctx.socket(zmq.PUSH)
Expand All @@ -23,7 +23,7 @@ def python_sender(url, n):
s.send(buf)


def main():
def main() -> None:
import argparse

parser = argparse.ArgumentParser(description="send & recv messages with Cython")
Expand Down
4 changes: 2 additions & 2 deletions examples/draft/radio-dish.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

for i in range(10):
time.sleep(0.1)
radio.send(b'%03i' % i, group='numbers')
radio.send(f'{i:03}'.encode('ascii'), group='numbers')
try:
msg = dish.recv(copy=False)
except zmq.Again:
print('missed a message')
continue
print("Received {}:{}".format(msg.group, msg.bytes.decode('utf8')))
print(f"Received {msg.group}:{msg.bytes.decode('utf8')}")

dish.close()
radio.close()
Expand Down
8 changes: 4 additions & 4 deletions examples/eventloop/asyncweb.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from zmq.eventloop.future import Context as FutureContext


def slow_responder():
def slow_responder() -> None:
"""thread for slowly responding to replies."""
ctx = zmq.Context()
socket = ctx.socket(zmq.ROUTER)
Expand All @@ -35,14 +35,14 @@ def slow_responder():
i += 1


def dot():
def dot() -> None:
"""callback for showing that IOLoop is still responsive while we wait"""
sys.stdout.write('.')
sys.stdout.flush()


class TestHandler(web.RequestHandler):
async def get(self):
async def get(self) -> None:
ctx = FutureContext.instance()
s = ctx.socket(zmq.DEALER)

Expand All @@ -56,7 +56,7 @@ async def get(self):
self.write(reply)


def main():
def main() -> None:
worker = threading.Thread(target=slow_responder)
worker.daemon = True
worker.start()
Expand Down
11 changes: 7 additions & 4 deletions examples/eventloop/echostream.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
#!/usr/bin/env python
"""Adapted echo.py to put the send in the event loop using a ZMQStream.
"""
from typing import List

from tornado import ioloop

import zmq
from zmq.eventloop import ioloop, zmqstream
from zmq.eventloop import zmqstream

loop = ioloop.IOLoop.instance()
loop = ioloop.IOLoop.current()

ctx = zmq.Context()
s = ctx.socket(zmq.ROUTER)
s.bind('tcp://127.0.0.1:5555')
stream = zmqstream.ZMQStream(s, loop)
stream = zmqstream.ZMQStream(s)


def echo(msg):
def echo(msg: List[bytes]):
print(" ".join(map(repr, msg)))
stream.send_multipart(msg)

Expand Down
6 changes: 3 additions & 3 deletions examples/gevent/poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ def sender():
while msgcnt < 10:
socks = dict(poller.poll())
if receiver1 in socks and socks[receiver1] == zmq.POLLIN:
print("Message from receiver1: %s" % receiver1.recv())
print(f"Message from receiver1: {receiver1.recv()!r}")
msgcnt += 1

if receiver2 in socks and socks[receiver2] == zmq.POLLIN:
print("Message from receiver2: %s" % receiver2.recv())
print(f"Message from receiver2: {receiver2.recv()!r}")
msgcnt += 1

print("%d messages received" % msgcnt)
print(f"{msgcnt} messages received")
6 changes: 4 additions & 2 deletions examples/gevent/simple.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from gevent import spawn, spawn_later

import zmq.green as zmq
Expand All @@ -23,7 +25,7 @@
sock.connect('ipc:///tmp/zmqtest')


def get_objs(sock):
def get_objs(sock: zmq.Socket):
while True:
o = sock.recv_pyobj()
print('received python object:', o)
Expand All @@ -32,7 +34,7 @@ def get_objs(sock):
break


def print_every(s, t=None):
def print_every(s: str, t: Optional[float] = None):
print(s)
if t:
spawn_later(t, print_every, s, t)
Expand Down
Loading

0 comments on commit 2dc67d5

Please sign in to comment.