Skip to content

Commit

Permalink
Use future annotations (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
atugushev authored Jul 27, 2023
1 parent a9c220b commit ee1ab75
Show file tree
Hide file tree
Showing 39 changed files with 191 additions and 149 deletions.
2 changes: 2 additions & 0 deletions ansq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from .tcp.connection import ConnectionFeatures, ConnectionOptions, open_connection
from .tcp.reader import create_reader
from .tcp.writer import create_writer
Expand Down
2 changes: 2 additions & 0 deletions ansq/http/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from .lookupd import NsqLookupd
from .writer import NSQDHTTPWriter

Expand Down
10 changes: 6 additions & 4 deletions ansq/http/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import asyncio
import json
from typing import TYPE_CHECKING, Any, Optional, Type, TypeVar
from typing import TYPE_CHECKING, Any, TypeVar

import aiohttp

Expand All @@ -24,7 +26,7 @@ def __init__(
host: str = "127.0.0.1",
port: int = 4151,
*,
loop: Optional["AbstractEventLoop"] = None,
loop: AbstractEventLoop | None = None,
) -> None:
self._loop = loop or asyncio.get_event_loop()
self._endpoint = (host, port)
Expand All @@ -34,9 +36,9 @@ def __init__(

@classmethod
def from_address(
cls: Type[_T],
cls: type[_T],
address: str,
loop: Optional["AbstractEventLoop"] = None,
loop: AbstractEventLoop | None = None,
) -> _T:
try:
host, port_str = address.split(":")
Expand Down
4 changes: 2 additions & 2 deletions ansq/http/http_exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict
from __future__ import annotations


class NSQHTTPException(Exception):
Expand All @@ -19,7 +19,7 @@ def error(self) -> str:
return self.args[1]

@property
def info(self) -> Dict:
def info(self) -> dict:
"""Dict of returned error info from ES, where available."""
return self.args[2]

Expand Down
2 changes: 2 additions & 0 deletions ansq/http/lookupd.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from ansq.typedefs import HTTPResponse

from .base import NSQHTTPConnection
Expand Down
2 changes: 2 additions & 0 deletions ansq/http/writer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from typing import Any

from ansq.typedefs import HTTPResponse
Expand Down
32 changes: 17 additions & 15 deletions ansq/tcp/connection.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import asyncio
import json
import warnings
from datetime import datetime, timezone
from typing import Any, AsyncGenerator, Callable, Mapping, Optional, Union
from typing import Any, AsyncGenerator, Callable, Mapping

import attr

Expand Down Expand Up @@ -124,7 +126,7 @@ async def _do_auto_reconnect(

async def _do_close(
self,
error: Optional[Union[Exception, str]] = None,
error: Exception | str | None = None,
change_status: bool = True,
silent: bool = False,
) -> None:
Expand Down Expand Up @@ -196,10 +198,10 @@ async def _do_close(

async def execute(
self,
command: Union[str, bytes],
command: str | bytes,
*args: Any,
data: Optional[Any] = None,
callback: Optional[Callable[[TCPResponse], Any]] = None,
data: Any | None = None,
callback: Callable[[TCPResponse], Any] | None = None,
) -> TCPResponse:
"""Execute command
Expand Down Expand Up @@ -263,9 +265,9 @@ async def execute(

async def identify(
self,
config: Optional[Union[dict, str]] = None,
config: dict | str | None = None,
*,
features: Optional[ConnectionFeatures] = None,
features: ConnectionFeatures | None = None,
**kwargs: Any,
) -> TCPResponse:
"""Executes `IDENTIFY` command.
Expand Down Expand Up @@ -362,7 +364,7 @@ async def _read_data_task(self) -> None:
"""Response reader task."""
assert self._reader is not None

error: Optional[Exception] = None
error: Exception | None = None

while not self._reader.at_eof():
try:
Expand Down Expand Up @@ -461,10 +463,10 @@ async def _read_buffer(self) -> None:
while is_continue:
is_continue = await self._parse_data()

def _start_upgrading(self, resp: Optional[TCPResponse] = None) -> None:
def _start_upgrading(self, resp: TCPResponse | None = None) -> None:
self._is_upgrading = True

async def _finish_upgrading(self, resp: Optional[TCPResponse] = None) -> None:
async def _finish_upgrading(self, resp: TCPResponse | None = None) -> None:
await self._read_buffer()
self._is_upgrading = False

Expand Down Expand Up @@ -522,13 +524,13 @@ async def rdy(self, messages_count: int = 1) -> None:
self.rdy_messages_count = messages_count
await self.execute(NSQCommands.RDY, messages_count)

async def fin(self, message_id: Union[str, NSQMessage]) -> None:
async def fin(self, message_id: str | NSQMessage) -> None:
"""Finish a message (indicate successful processing)"""
if isinstance(message_id, NSQMessage):
await message_id.fin()
await self.execute(NSQCommands.FIN, message_id)

async def req(self, message_id: Union[str, NSQMessage], timeout: int = 0) -> None:
async def req(self, message_id: str | NSQMessage, timeout: int = 0) -> None:
"""Re-queue a message (indicate failure to process)
The re-queued message is placed at the tail of the queue,
Expand All @@ -538,7 +540,7 @@ async def req(self, message_id: Union[str, NSQMessage], timeout: int = 0) -> Non
await message_id.req(timeout)
await self.execute(NSQCommands.REQ, message_id, timeout)

async def touch(self, message_id: Union[str, NSQMessage]) -> None:
async def touch(self, message_id: str | NSQMessage) -> None:
"""Reset the timeout for an in-flight message"""
if isinstance(message_id, NSQMessage):
await message_id.touch()
Expand Down Expand Up @@ -570,7 +572,7 @@ async def messages(self) -> AsyncGenerator[NSQMessage, None]:
continue
yield message

def get_message(self) -> Optional[NSQMessage]:
def get_message(self) -> NSQMessage | None:
"""Shortcut for ``asyncio.Queue.get_nowait()``
without raising exceptions
"""
Expand All @@ -579,7 +581,7 @@ def get_message(self) -> Optional[NSQMessage]:
except asyncio.QueueEmpty:
return None

async def wait_for_message(self) -> Optional[NSQMessage]:
async def wait_for_message(self) -> NSQMessage | None:
"""Shortcut for `asyncio.Queue.get()``.
:rtype: :class:`NSQMessage`
Expand Down
2 changes: 2 additions & 0 deletions ansq/tcp/consts.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

NL = b"\n"
DATA_SIZE = 4
FRAME_SIZE = 4
Expand Down
4 changes: 2 additions & 2 deletions ansq/tcp/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Union
from __future__ import annotations


class ConnectionClosedError(Exception):
Expand Down Expand Up @@ -132,5 +132,5 @@ class NSQTouchFailed(NSQErrorCode):
# E_FIN_FAILED


def get_exception(code: str, error_message: Union[str, bytes]) -> NSQException:
def get_exception(code: str, error_message: str | bytes) -> NSQException:
return ERROR_CODES.get(code, NSQErrorCode)(f"{code}: {error_message!r}")
20 changes: 10 additions & 10 deletions ansq/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
:see: https://nsq.io/clients/tcp_protocol_spec.html
"""
from __future__ import annotations

import abc
import struct
from typing import Any, Optional, Tuple, Union
from typing import Any

from ansq.tcp import consts
from ansq.tcp.exceptions import ProtocolError
Expand All @@ -29,12 +31,12 @@ def get(self) -> Any:
pass

@abc.abstractmethod # pragma: no cover
def encode_command(self, cmd: str, *args: Any, data: Optional[Any] = None) -> bytes:
def encode_command(self, cmd: str, *args: Any, data: Any | None = None) -> bytes:
pass


class Reader(BaseReader):
def __init__(self, buffer: Optional[bytes] = None):
def __init__(self, buffer: bytes | None = None):
self._buffer = bytearray()
self._is_header = False
self._payload_size = 0
Expand All @@ -57,7 +59,7 @@ def feed(self, chunk: bytes) -> None:

def get(
self,
) -> Optional[Union[NSQResponseSchema, NSQErrorSchema, NSQMessageSchema]]:
) -> NSQResponseSchema | NSQErrorSchema | NSQMessageSchema | None:
"""Get from buffer NSQ response
:raises ProtocolError: On unexpected NSQ message's FrameType
Expand Down Expand Up @@ -86,7 +88,7 @@ def get(

def _parse_payload(
self, frame_type: FrameType, payload_size: int
) -> Union[NSQResponseSchema, NSQErrorSchema, NSQMessageSchema]:
) -> NSQResponseSchema | NSQErrorSchema | NSQMessageSchema:
"""Parse from buffer NSQ response
:raises ProtocolError: On unexpected NSQ message's FrameType
Expand Down Expand Up @@ -114,13 +116,13 @@ def _unpack_response(self, payload_size: int) -> bytes:
end = consts.DATA_SIZE + payload_size
return bytes(self._buffer[start:end])

def _unpack_error(self, payload_size: int) -> Tuple[bytes, bytes]:
def _unpack_error(self, payload_size: int) -> tuple[bytes, bytes]:
"""Unpack the error from the buffer"""
error = self._unpack_response(payload_size)
code, msg = error.split(maxsplit=1)
return code, msg

def _unpack_message(self, payload_size: int) -> Tuple[int, int, bytes, bytes]:
def _unpack_message(self, payload_size: int) -> tuple[int, int, bytes, bytes]:
"""Unpack the message from the buffer.
:see: https://docs.python.org/3/library/struct.html
Expand All @@ -135,9 +137,7 @@ def _unpack_message(self, payload_size: int) -> Tuple[int, int, bytes, bytes]:
timestamp, attempts, id_, body = struct.unpack(fmt, self._buffer[start:end])
return timestamp, attempts, id_, body

def encode_command(
self, cmd: Union[str, bytes], *args: Any, data: Any = None
) -> bytes:
def encode_command(self, cmd: str | bytes, *args: Any, data: Any = None) -> bytes:
"""Encode command to bytes"""
_cmd = convert_to_bytes(cmd.upper().strip())
_args = [convert_to_bytes(a) for a in args]
Expand Down
Loading

0 comments on commit ee1ab75

Please sign in to comment.