Skip to content

Commit badb27a

Browse files
committed
tests
1 parent a4588fb commit badb27a

File tree

4 files changed

+95
-8
lines changed

4 files changed

+95
-8
lines changed

tests/topics/test_control_plane.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,23 @@ async def test_describe_topic(self, driver, topic_path: str, topic_consumer):
3939

4040
assert has_consumer
4141

42-
async def test_alter_topic(self, driver, topic_path):
42+
async def test_alter_not_existed_topic(self, driver, topic_path):
4343
client = driver.topic_client
4444

45-
await client.alter_topic(topic_path)
46-
4745
with pytest.raises(issues.SchemeError):
4846
await client.alter_topic(topic_path + "-not-exist")
4947

48+
async def test_alter_existed_topic(self, driver, topic_path):
49+
client = driver.topic_client
50+
51+
topic_before = await client.describe_topic(topic_path)
52+
53+
target_min_active_partitions = topic_before.min_active_partitions + 1
54+
await client.alter_topic(topic_path, set_min_active_partitions=target_min_active_partitions)
55+
56+
topic_after = await client.describe_topic(topic_path)
57+
assert topic_after.min_active_partitions == target_min_active_partitions
58+
5059

5160
class TestTopicClientControlPlane:
5261
def test_create_topic(self, driver_sync, database):
@@ -81,10 +90,19 @@ def test_describe_topic(self, driver_sync, topic_path: str, topic_consumer):
8190

8291
assert has_consumer
8392

84-
def test_alter_topic(self, driver_sync, topic_path):
93+
def test_alter_not_existed_topic(self, driver_sync, topic_path):
8594
client = driver_sync.topic_client
8695

87-
client.alter_topic(topic_path)
88-
8996
with pytest.raises(issues.SchemeError):
9097
client.alter_topic(topic_path + "-not-exist")
98+
99+
def test_alter_existed_topic(self, driver_sync, topic_path):
100+
client = driver_sync.topic_client
101+
102+
topic_before = client.describe_topic(topic_path)
103+
104+
target_min_active_partitions = topic_before.min_active_partitions + 1
105+
client.alter_topic(topic_path, set_min_active_partitions=target_min_active_partitions)
106+
107+
topic_after = client.describe_topic(topic_path)
108+
assert topic_after.min_active_partitions == target_min_active_partitions

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1063,13 +1063,15 @@ class AlterTopicRequest(IToProto, IFromPublic):
10631063
set_metering_mode: "MeteringMode"
10641064

10651065
def to_proto(self) -> ydb_topic_pb2.AlterTopicRequest:
1066+
supported_codecs = self.set_supported_codecs.to_proto() if self.set_supported_codecs.codecs else None
1067+
10661068
return ydb_topic_pb2.AlterTopicRequest(
10671069
path=self.path,
10681070
add_consumers=[consumer.to_proto() for consumer in self.add_consumers],
10691071
alter_partitioning_settings=self.alter_partitioning_settings.to_proto(),
10701072
set_retention_period=proto_duration_from_timedelta(self.set_retention_period),
10711073
set_retention_storage_mb=self.set_retention_storage_mb,
1072-
set_supported_codecs=self.set_supported_codecs.to_proto(),
1074+
set_supported_codecs=supported_codecs,
10731075
set_partition_write_burst_bytes=self.set_partition_write_burst_bytes,
10741076
set_partition_write_speed_bytes_per_second=self.set_partition_write_speed_bytes_per_second,
10751077
alter_attributes=self.alter_attributes,

ydb/_grpc/grpcwrapper/ydb_topic_public_types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class AlterTopicRequestParams:
3737
set_partition_count_limit: Optional[int]
3838
add_consumers: Optional[List[Union["PublicConsumer", str]]]
3939
alter_consumers: Optional[List[Union["PublicAlterConsumer", str]]]
40-
drop_consumers: Optional[List[str]] # TODO: clarify
40+
drop_consumers: Optional[List[str]]
4141
alter_attributes: Optional[Dict[str, str]]
4242
set_metering_mode: Optional["PublicMeteringMode"]
4343
set_partition_write_speed_bytes_per_second: Optional[int]

ydb/_grpc/grpcwrapper/ydb_topic_test.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,15 @@
1+
import datetime
2+
3+
from google.protobuf.json_format import MessageToDict
4+
15
from ydb._grpc.grpcwrapper.ydb_topic import OffsetsRange
6+
from .ydb_topic import AlterTopicRequest
7+
from .ydb_topic_public_types import (
8+
AlterTopicRequestParams,
9+
PublicAlterConsumer,
10+
PublicConsumer,
11+
PublicCodec,
12+
)
213

314

415
def test_offsets_range_intersected():
@@ -17,3 +28,59 @@ def test_offsets_range_intersected():
1728
]:
1829
assert OffsetsRange(test[0], test[1]).is_intersected_with(OffsetsRange(test[2], test[3]))
1930
assert OffsetsRange(test[2], test[3]).is_intersected_with(OffsetsRange(test[0], test[1]))
31+
32+
33+
def test_alter_topic_request_from_public_to_proto():
34+
# Specify all fields with all possible input ways
35+
params = {
36+
"path": "topic_name",
37+
"add_consumers": [
38+
"new_consumer_1",
39+
PublicConsumer("new_consumer_2"),
40+
],
41+
"alter_consumers": [
42+
"old_consumer_1",
43+
PublicAlterConsumer("old_consumer_2"),
44+
],
45+
"drop_consumers": ["redundant_consumer"],
46+
"set_retention_period": datetime.timedelta(weeks=4),
47+
"set_retention_storage_mb": 4,
48+
"set_supported_codecs": [1, PublicCodec(2)],
49+
"set_partition_write_burst_bytes": 8,
50+
"set_partition_write_speed_bytes_per_second": 15,
51+
"alter_attributes": {"key": "value"},
52+
"set_metering_mode": 1,
53+
"set_min_active_partitions": 2,
54+
"set_partition_count_limit": 4,
55+
}
56+
57+
params_public = AlterTopicRequestParams(**params)
58+
request = AlterTopicRequest.from_public(params_public)
59+
request_proto = request.to_proto()
60+
61+
msg_dict = MessageToDict(request_proto, preserving_proto_field_name=True)
62+
63+
assert msg_dict["path"] == params["path"]
64+
assert len(msg_dict["add_consumers"]) == len(params["add_consumers"])
65+
assert len(msg_dict["alter_consumers"]) == len(params["alter_consumers"])
66+
assert len(msg_dict["drop_consumers"]) == len(params["drop_consumers"])
67+
assert msg_dict["alter_attributes"] == params["alter_attributes"]
68+
69+
assert (
70+
int(msg_dict["alter_partitioning_settings"]["set_min_active_partitions"]) == params["set_min_active_partitions"]
71+
)
72+
assert (
73+
int(msg_dict["alter_partitioning_settings"]["set_partition_count_limit"]) == params["set_partition_count_limit"]
74+
)
75+
76+
assert int(msg_dict["set_partition_write_burst_bytes"]) == params["set_partition_write_burst_bytes"]
77+
assert (
78+
int(msg_dict["set_partition_write_speed_bytes_per_second"])
79+
== params["set_partition_write_speed_bytes_per_second"]
80+
)
81+
assert msg_dict["set_retention_period"] == str(int(params["set_retention_period"].total_seconds())) + "s"
82+
assert int(msg_dict["set_retention_storage_mb"]) == params["set_retention_storage_mb"]
83+
84+
assert msg_dict["set_metering_mode"] == "METERING_MODE_RESERVED_CAPACITY"
85+
86+
assert msg_dict["set_supported_codecs"]["codecs"] == params["set_supported_codecs"]

0 commit comments

Comments
 (0)