Skip to content

Commit 62b8033

Browse files
committed
create topic reader from topic client
1 parent 538d6c7 commit 62b8033

File tree

4 files changed

+41
-123
lines changed

4 files changed

+41
-123
lines changed

tests/topics/test_topic_reader.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@
77
@pytest.mark.asyncio
88
class TestTopicWriterAsyncIO:
99
async def test_read_message(self, driver, topic_path, topic_with_messages, topic_consumer):
10-
reader = PublicAsyncIOReader(driver, TopicReaderSettings(
11-
consumer=topic_consumer,
12-
topic=topic_path,
13-
))
10+
reader = driver.topic_client.topic_reader(topic_consumer, topic_path)
1411

1512
assert await reader.receive_batch() is not None

ydb/_topic_reader/topic_reader.py

Lines changed: 0 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -32,104 +32,6 @@ def __init__(self, path, *, partitions: Union[None, int, List[int]] = None):
3232
self.partitions = partitions
3333

3434

35-
class ReaderAsyncIO(object):
36-
async def __aenter__(self):
37-
raise NotImplementedError()
38-
39-
async def __aexit__(self, exc_type, exc_val, exc_tb):
40-
raise NotImplementedError()
41-
42-
async def sessions_stat(self) -> List["SessionStat"]:
43-
"""
44-
Receive stat from the server
45-
46-
use asyncio.wait_for for wait with timeout.
47-
"""
48-
raise NotImplementedError()
49-
50-
def messages(
51-
self, *, timeout: Union[float, None] = None
52-
) -> AsyncIterable["PublicMessage"]:
53-
"""
54-
Block until receive new message
55-
56-
if no new messages in timeout seconds: stop iteration by raise StopAsyncIteration
57-
"""
58-
raise NotImplementedError()
59-
60-
async def receive_message(self) -> Union["PublicMessage", None]:
61-
"""
62-
Block until receive new message
63-
64-
use asyncio.wait_for for wait with timeout.
65-
"""
66-
raise NotImplementedError()
67-
68-
def batches(
69-
self,
70-
*,
71-
max_messages: Union[int, None] = None,
72-
max_bytes: Union[int, None] = None,
73-
timeout: Union[float, None] = None,
74-
) -> AsyncIterable["PublicBatch"]:
75-
"""
76-
Block until receive new batch.
77-
All messages in a batch from same partition.
78-
79-
if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration
80-
"""
81-
raise NotImplementedError()
82-
83-
async def receive_batch(
84-
self, *, max_messages: Union[int, None] = None, max_bytes: Union[int, None]
85-
) -> Union["PublicBatch", None]:
86-
"""
87-
Get one messages batch from reader.
88-
All messages in a batch from same partition.
89-
90-
use asyncio.wait_for for wait with timeout.
91-
"""
92-
raise NotImplementedError()
93-
94-
async def commit_on_exit(self, mess: "ICommittable") -> AsyncContextManager:
95-
"""
96-
commit the mess match/message if exit from context manager without exceptions
97-
98-
reader will close if exit from context manager with exception
99-
"""
100-
raise NotImplementedError()
101-
102-
def commit(self, mess: "ICommittable"):
103-
"""
104-
Write commit message to a buffer.
105-
106-
For the method no way check the commit result
107-
(for example if lost connection - commits will not re-send and committed messages will receive again)
108-
"""
109-
raise NotImplementedError()
110-
111-
async def commit_with_ack(
112-
self, mess: "ICommittable"
113-
) -> Union["CommitResult", List["CommitResult"]]:
114-
"""
115-
write commit message to a buffer and wait ack from the server.
116-
117-
use asyncio.wait_for for wait with timeout.
118-
"""
119-
raise NotImplementedError()
120-
121-
async def flush(self):
122-
"""
123-
force send all commit messages from internal buffers to server and wait acks for all of them.
124-
125-
use asyncio.wait_for for wait with timeout.
126-
"""
127-
raise NotImplementedError()
128-
129-
async def close(self):
130-
raise NotImplementedError()
131-
132-
13335
class Reader(object):
13436
def async_sessions_stat(self) -> concurrent.futures.Future:
13537
"""

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,22 @@ class TopicReaderError(YdbError):
2626

2727
class TopicReaderStreamClosedError(TopicReaderError):
2828
def __init__(self):
29-
super().__init__("Topic reader is closed")
29+
super().__init__("Topic reader stream is closed")
30+
31+
32+
class TopicReaderClosedError(TopicReaderError):
33+
def __init__(self):
34+
super().__init__("Topic reader is closed already")
3035

3136

3237
class PublicAsyncIOReader:
3338
_loop: asyncio.AbstractEventLoop
39+
_closed: bool
3440
_reconnector: ReaderReconnector
3541

3642
def __init__(self, driver: Driver, settings: PublicReaderSettings):
3743
self._loop = asyncio.get_running_loop()
44+
self._closed = False
3845
self._reconnector = ReaderReconnector(driver, settings)
3946

4047
async def __aenter__(self):
@@ -43,6 +50,10 @@ async def __aenter__(self):
4350
async def __aexit__(self, exc_type, exc_val, exc_tb):
4451
raise NotImplementedError()
4552

53+
def __del__(self):
54+
if not self._closed:
55+
self._loop.create_task(self.close(), name="close reader")
56+
4657
async def sessions_stat(self) -> typing.List["SessionStat"]:
4758
"""
4859
Receive stat from the server
@@ -132,7 +143,11 @@ async def flush(self):
132143
raise NotImplementedError()
133144

134145
async def close(self):
135-
raise NotImplementedError()
146+
if self._closed:
147+
raise TopicReaderClosedError()
148+
149+
self._closed = True
150+
await self._reconnector.close()
136151

137152

138153
class ReaderReconnector:

ydb/topic.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
1-
from typing import List, Callable, Union, Mapping, Any
1+
from typing import List, Callable, Union, Mapping, Any, Optional
22

33
from . import aio, Credentials
44
from ._topic_reader.topic_reader import (
55
PublicReaderSettings as TopicReaderSettings,
66
Reader as TopicReader,
7-
ReaderAsyncIO as TopicReaderAsyncIO,
87
Selector as TopicSelector,
98
Events as TopicReaderEvents,
109
RetryPolicy as TopicReaderRetryPolicy,
1110
StubEvent as TopicReaderStubEvent,
1211
)
1312

13+
from ._topic_reader.topic_reader_asyncio import (
14+
PublicAsyncIOReader as TopicReaderAsyncIO
15+
)
16+
from ._topic_wrapper.common import TokenGetterFuncType
1417

1518
from ._topic_writer.topic_writer import ( # noqa: F401
1619
Writer as TopicWriter,
@@ -32,26 +35,27 @@ def __init__(self, driver: aio.Driver, settings: "TopicClientSettings" = None):
3235

3336
def topic_reader(
3437
self,
35-
topic: Union[str, TopicSelector, List[Union[str, TopicSelector]]],
3638
consumer: str,
37-
commit_batch_time: Union[float, None] = 0.1,
38-
commit_batch_count: Union[int, None] = 1000,
39+
topic: str,
3940
buffer_size_bytes: int = 50 * 1024 * 1024,
40-
sync_commit: bool = False, # reader.commit(...) will wait commit ack from server
41-
on_commit: Callable[["TopicReaderEvents.OnCommit"], None] = None,
42-
on_get_partition_start_offset: Callable[
43-
["TopicReaderEvents.OnPartitionGetStartOffsetRequest"],
44-
"TopicReaderEvents.OnPartitionGetStartOffsetResponse",
45-
] = None,
46-
on_init_partition: Callable[["TopicReaderStubEvent"], None] = None,
47-
on_shutdown_partition: Callable[["TopicReaderStubEvent"], None] = None,
48-
decoder: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
49-
deserializer: Union[Callable[[bytes], Any], None] = None,
50-
one_attempt_connection_timeout: Union[float, None] = 1,
51-
connection_timeout: Union[float, None] = None,
52-
retry_policy: Union["TopicReaderRetryPolicy", None] = None,
41+
# on_commit: Callable[["Events.OnCommit"], None] = None
42+
# on_get_partition_start_offset: Callable[
43+
# ["Events.OnPartitionGetStartOffsetRequest"],
44+
# "Events.OnPartitionGetStartOffsetResponse",
45+
# ] = None
46+
# on_partition_session_start: Callable[["StubEvent"], None] = None
47+
# on_partition_session_stop: Callable[["StubEvent"], None] = None
48+
# on_partition_session_close: Callable[["StubEvent"], None] = None # todo?
49+
# decoder: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
50+
# deserializer: Union[Callable[[bytes], Any], None] = None
51+
# one_attempt_connection_timeout: Union[float, None] = 1
52+
# connection_timeout: Union[float, None] = None
53+
# retry_policy: Union["RetryPolicy", None] = None
5354
) -> TopicReaderAsyncIO:
54-
raise NotImplementedError()
55+
args = locals()
56+
del args["self"]
57+
settings = TopicReaderSettings(**args)
58+
return TopicReaderAsyncIO(self._driver, settings)
5559

5660
def topic_writer(
5761
self,

0 commit comments

Comments
 (0)