Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add docs #17

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 0 additions & 143 deletions README.md

This file was deleted.

156 changes: 156 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
ANSQ - The Async NSQ client
===========================

|PyPI version| |Tests| |Coverage| |Python|

.. |PyPI version| image:: https://badge.fury.io/py/ansq.svg
:alt: PyPI version
:target: https://badge.fury.io/py/ansq
.. |Tests| image:: https://github.com/list-family/ansq/workflows/Test/badge.svg
:alt: Tests
.. |Coverage| image:: https://codecov.io/gh/list-family/ansq/branch/master/graph/badge.svg
:alt: Coverage
:target: https://codecov.io/gh/list-family/ansq
.. |Python| image:: https://img.shields.io/pypi/pyversions/ansq.svg
:alt: Python

Written with native Asyncio NSQ package

Features
---------

- TCP client
- HTTP wrapper
- One connection for writer and reader
- Self-healing: when the NSQ connection is lost, reconnects, sends identify
and auth commands, subscribes to previous topic/channel
- Many helper-methods in each class

Release v1.0 Roadmap
--------------------

- [x] Docs
- [ ] HTTP API wrapper refactoring
- [ ] Deflate, Snappy compressions
- [ ] TLSv1

Example 1: Consumer
-------------------

.. code-block:: python

import asyncio
from ansq import open_connection
from ansq.tcp.connection import NSQConnection


async def main(nsq: NSQConnection):
await nsq.subscribe('test_topic', 'channel1', 2)
while True:
async for message in nsq.messages():
print('Message: ' + str(message))
# message.body is bytes,
# __str__ method decodes bytes
#
# Something do with messages...
# Then, mark as processed it
await message.fin()

# If you don't break the loop
# and don't set auth_reconnect parameter to False,
# but you have reached this point,
# it means that the NSQ connection is lost
# and cannot reconnect.
#
# Errors in ansq package are logging with ERROR level,
# so you can see errors in console.
print('Connection status: ' + str(nsq.status))
# Prints one of this:
# Connection status: ConnectionStatus.CLOSING
# Connection status: ConnectionStatus.CLOSED

# You can reconnect here in try-except block
# or just leave the function and finish the program.
# It's all depends on the design of your application.
return


if __name__ == '__main__':
loop = asyncio.get_event_loop()
nsq_connection = loop.run_until_complete(open_connection())

try:
loop.run_until_complete(main(nsq_connection))
except KeyboardInterrupt:
pass

# You should close connection correctly
loop.run_until_complete(nsq_connection.close())


Example 2: Writer and reader
----------------------------

.. code-block:: python

import asyncio
from ansq import open_connection


async def main():
nsq = await open_connection()
print(await nsq.pub('test_topic', 'test_message'))
# <NSQResponseSchema frame_type:FrameType.RESPONSE, body:b'OK', is_ok:True>
print(await nsq.dpub('test_topic', 'test_message', 3))
# <NSQResponseSchema frame_type:FrameType.RESPONSE, body:b'OK', is_ok:True>
print(await nsq.mpub('test_topic', list('test_message')))
# <NSQResponseSchema frame_type:FrameType.RESPONSE, body:b'OK', is_ok:True>

await nsq.subscribe('test_topic', 'channel1', 2)
processed_messages = 0
async for message in nsq.messages():
print('Message #{}: {}'.format(processed_messages, message))
# Message #0: test_message
# Message #1: t
# Message #2: e
# Message #3: s
# Message #4: t
# ...
# Message #10: test_message
await message.fin()
processed_messages += 1

if processed_messages == 10:
break

single_message = await nsq.wait_for_message()
print('Single message: ' + str(single_message))
# message.body is bytes,
# __str__ method decodes bytes
# Prints decoded message.body

# Also it has real good repr
print(repr(single_message))
# <NSQMessage id="0d406ce4661af003", body=b'e', attempts=1,
# timestamp=1590162134305413767, timeout=60000,
# initialized_at=1590162194.8242455, is_timed_out=False,
# is_processed=False>

# Very long task
# ...
# We need to touch message or it will be timed out
await single_message.touch()
# Continue very long task
# ...

# Something went wrong in task
# in except handler re-queue message
await single_message.req()

# Connection should be closed
await nsq.close()


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
2 changes: 1 addition & 1 deletion ansq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .tcp.connection import open_connection

__all__ = ['tcp', 'http', 'open_connection']
__all__ = ['open_connection']
6 changes: 6 additions & 0 deletions ansq/tcp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .connection import open_connection, NSQConnection

__all__ = (
'NSQConnection',
'open_connection'
)
10 changes: 5 additions & 5 deletions ansq/tcp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ async def execute(
) -> Optional[Union[NSQResponseSchema, NSQErrorSchema, NSQMessageSchema]]:
"""Execute command

Be careful: commands ``NOP``, ``FIN``, ``RDY``, ``REQ``, ``TOUCH``
by NSQ spec returns ``None`` as The class:`asyncio.Future` result.
.. warning::
Be careful: commands ``NOP``, ``FIN``, ``RDY``, ``REQ``
and ``TOUCH`` returns ``None`` result by NSQ spec.

:returns: The response from NSQ.
"""
Expand Down Expand Up @@ -426,9 +427,8 @@ def get_message(self) -> Optional[NSQMessage]:
async def wait_for_message(self) -> NSQMessage:
"""Shortcut for `asyncio.Queue.get()``.

:rtype: :class:`NSQMessage`
:returns: :class:`NSQMessage`.
Be aware on closing action may returns ``None``.
:rtype: :class:`~NSQMessage`
:returns: Be aware on closing action may returns ``None``.
This is need to exit from ``NSQConnection.messages`` generator
when connection closed with exception.
"""
Expand Down
15 changes: 8 additions & 7 deletions ansq/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

class JSONEncoder(json.JSONEncoder):
def default(self, obj) -> str:
""""""
try:
return convert_to_str(obj)
except TypeError:
Expand Down Expand Up @@ -41,9 +42,9 @@ def convert_to_bytes(value) -> bytes:
"""Dispatch for convertible types.

Allowed types: ``bytes``, ``bytearray``, ``str``, ``int``, ``float``,
``dict``, ``Decimal``, ``dataclass``.
``dict``, ``Decimal``, ``dataclass``.

:raises TypeError:
:raises TypeError: Unexpected value type
"""
if PY37:
from dataclasses import asdict, is_dataclass
Expand Down Expand Up @@ -100,13 +101,13 @@ def _(value: datetime) -> bytes:


@singledispatch
def convert_to_str(value):
def convert_to_str(value) -> str:
"""Dispatch for convertible types.

Allowed types: ``bytes``, ``bytearray``, ``str``, ``int``, ``float``,
``dict``, ``Decimal``, ``dataclass``.
``dict``, ``Decimal``, ``dataclass``.

:raises TypeError:
:raises TypeError: Unexpected value type
"""
if PY37:
from dataclasses import asdict, is_dataclass
Expand Down Expand Up @@ -163,8 +164,8 @@ def _(value: datetime) -> str:
return value.isoformat()


def get_logger(debug: bool = False):
"""Get the ansq logger.
def get_logger(debug: bool = False) -> logging.Logger:
"""Get the ``ansq`` logger.

:params debug: Set up debug level.
:type debug: :class:`bool`
Expand Down
Loading