Skip to content

Commit 3eeef84

Browse files
authored
Merge pull request #448 from ydb-platform/impl/alter_topic
Alter topic feature
2 parents 36ef09b + e8e1272 commit 3eeef84

File tree

6 files changed

+371
-2
lines changed

6 files changed

+371
-2
lines changed

tests/topics/test_control_plane.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,23 @@ async def test_describe_topic(self, driver, topic_path: str, topic_consumer):
3939

4040
assert has_consumer
4141

42+
async def test_alter_not_existed_topic(self, driver, topic_path):
43+
client = driver.topic_client
44+
45+
with pytest.raises(issues.SchemeError):
46+
await client.alter_topic(topic_path + "-not-exist")
47+
48+
async def test_alter_existed_topic(self, driver, topic_path):
49+
client = driver.topic_client
50+
51+
topic_before = await client.describe_topic(topic_path)
52+
53+
target_min_active_partitions = topic_before.min_active_partitions + 1
54+
await client.alter_topic(topic_path, set_min_active_partitions=target_min_active_partitions)
55+
56+
topic_after = await client.describe_topic(topic_path)
57+
assert topic_after.min_active_partitions == target_min_active_partitions
58+
4259

4360
class TestTopicClientControlPlane:
4461
def test_create_topic(self, driver_sync, database):
@@ -72,3 +89,20 @@ def test_describe_topic(self, driver_sync, topic_path: str, topic_consumer):
7289
break
7390

7491
assert has_consumer
92+
93+
def test_alter_not_existed_topic(self, driver_sync, topic_path):
94+
client = driver_sync.topic_client
95+
96+
with pytest.raises(issues.SchemeError):
97+
client.alter_topic(topic_path + "-not-exist")
98+
99+
def test_alter_existed_topic(self, driver_sync, topic_path):
100+
client = driver_sync.topic_client
101+
102+
topic_before = client.describe_topic(topic_path)
103+
104+
target_min_active_partitions = topic_before.min_active_partitions + 1
105+
client.alter_topic(topic_path, set_min_active_partitions=target_min_active_partitions)
106+
107+
topic_after = client.describe_topic(topic_path)
108+
assert topic_after.min_active_partitions == target_min_active_partitions

ydb/_apis.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ class TopicService(object):
123123

124124
CreateTopic = "CreateTopic"
125125
DescribeTopic = "DescribeTopic"
126+
AlterTopic = "AlterTopic"
126127
DropTopic = "DropTopic"
127128
StreamRead = "StreamRead"
128129
StreamWrite = "StreamWrite"

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 134 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
)
3434

3535

36-
class Codec(int, IToPublic):
36+
class Codec(int, IToPublic, IFromPublic):
3737
CODEC_UNSPECIFIED = 0
3838
CODEC_RAW = 1
3939
CODEC_GZIP = 2
@@ -47,9 +47,13 @@ def from_proto_iterable(codecs: typing.Iterable[int]) -> List["Codec"]:
4747
def to_public(self) -> ydb_topic_public_types.PublicCodec:
4848
return ydb_topic_public_types.PublicCodec(int(self))
4949

50+
@staticmethod
51+
def from_public(codec: Union[ydb_topic_public_types.PublicCodec, int]) -> "Codec":
52+
return Codec(int(codec))
53+
5054

5155
@dataclass
52-
class SupportedCodecs(IToProto, IFromProto, IToPublic):
56+
class SupportedCodecs(IToProto, IFromProto, IToPublic, IFromPublic):
5357
codecs: List[Codec]
5458

5559
def to_proto(self) -> ydb_topic_pb2.SupportedCodecs:
@@ -69,6 +73,15 @@ def from_proto(msg: Optional[ydb_topic_pb2.SupportedCodecs]) -> "SupportedCodecs
6973
def to_public(self) -> List[ydb_topic_public_types.PublicCodec]:
7074
return list(map(Codec.to_public, self.codecs))
7175

76+
@staticmethod
77+
def from_public(
78+
codecs: Optional[List[Union[ydb_topic_public_types.PublicCodec, int]]]
79+
) -> Optional["SupportedCodecs"]:
80+
if codecs is None:
81+
return None
82+
83+
return SupportedCodecs(codecs=[Codec.from_public(codec) for codec in codecs])
84+
7285

7386
@dataclass(order=True)
7487
class OffsetsRange(IFromProto, IToProto):
@@ -883,6 +896,41 @@ def from_proto(
883896
)
884897

885898

899+
@dataclass
900+
class AlterConsumer(IToProto, IFromPublic):
901+
name: str
902+
set_important: Optional[bool]
903+
set_read_from: Optional[datetime.datetime]
904+
set_supported_codecs: Optional[SupportedCodecs]
905+
alter_attributes: Optional[Dict[str, str]]
906+
907+
def to_proto(self) -> ydb_topic_pb2.AlterConsumer:
908+
supported_codecs = None
909+
if self.set_supported_codecs is not None:
910+
supported_codecs = self.set_supported_codecs.to_proto()
911+
912+
return ydb_topic_pb2.AlterConsumer(
913+
name=self.name,
914+
set_important=self.set_important,
915+
set_read_from=proto_timestamp_from_datetime(self.set_read_from),
916+
set_supported_codecs=supported_codecs,
917+
alter_attributes=self.alter_attributes,
918+
)
919+
920+
@staticmethod
921+
def from_public(alter_consumer: ydb_topic_public_types.PublicAlterConsumer) -> AlterConsumer:
922+
if not alter_consumer:
923+
return None
924+
925+
return AlterConsumer(
926+
name=alter_consumer.name,
927+
set_important=alter_consumer.set_important,
928+
set_read_from=alter_consumer.set_read_from,
929+
set_supported_codecs=SupportedCodecs.from_public(alter_consumer.set_supported_codecs),
930+
alter_attributes=alter_consumer.alter_attributes,
931+
)
932+
933+
886934
@dataclass
887935
class PartitioningSettings(IToProto, IFromProto):
888936
min_active_partitions: int
@@ -902,6 +950,18 @@ def to_proto(self) -> ydb_topic_pb2.PartitioningSettings:
902950
)
903951

904952

953+
@dataclass
954+
class AlterPartitioningSettings(IToProto):
955+
set_min_active_partitions: Optional[int]
956+
set_partition_count_limit: Optional[int]
957+
958+
def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings:
959+
return ydb_topic_pb2.AlterPartitioningSettings(
960+
set_min_active_partitions=self.set_min_active_partitions,
961+
set_partition_count_limit=self.set_partition_count_limit,
962+
)
963+
964+
905965
class MeteringMode(int, IFromProto, IFromPublic, IToPublic):
906966
UNSPECIFIED = 0
907967
RESERVED_CAPACITY = 1
@@ -995,6 +1055,78 @@ class CreateTopicResult:
9951055
pass
9961056

9971057

1058+
@dataclass
1059+
class AlterTopicRequest(IToProto, IFromPublic):
1060+
path: str
1061+
add_consumers: Optional[List["Consumer"]]
1062+
alter_partitioning_settings: Optional[AlterPartitioningSettings]
1063+
set_retention_period: Optional[datetime.timedelta]
1064+
set_retention_storage_mb: Optional[int]
1065+
set_supported_codecs: Optional[SupportedCodecs]
1066+
set_partition_write_burst_bytes: Optional[int]
1067+
set_partition_write_speed_bytes_per_second: Optional[int]
1068+
alter_attributes: Optional[Dict[str, str]]
1069+
alter_consumers: Optional[List[AlterConsumer]]
1070+
drop_consumers: Optional[List[str]]
1071+
set_metering_mode: Optional["MeteringMode"]
1072+
1073+
def to_proto(self) -> ydb_topic_pb2.AlterTopicRequest:
1074+
supported_codecs = None
1075+
if self.set_supported_codecs is not None:
1076+
supported_codecs = self.set_supported_codecs.to_proto()
1077+
1078+
return ydb_topic_pb2.AlterTopicRequest(
1079+
path=self.path,
1080+
add_consumers=[consumer.to_proto() for consumer in self.add_consumers],
1081+
alter_partitioning_settings=self.alter_partitioning_settings.to_proto(),
1082+
set_retention_period=proto_duration_from_timedelta(self.set_retention_period),
1083+
set_retention_storage_mb=self.set_retention_storage_mb,
1084+
set_supported_codecs=supported_codecs,
1085+
set_partition_write_burst_bytes=self.set_partition_write_burst_bytes,
1086+
set_partition_write_speed_bytes_per_second=self.set_partition_write_speed_bytes_per_second,
1087+
alter_attributes=self.alter_attributes,
1088+
alter_consumers=[consumer.to_proto() for consumer in self.alter_consumers],
1089+
drop_consumers=list(self.drop_consumers),
1090+
set_metering_mode=self.set_metering_mode,
1091+
)
1092+
1093+
@staticmethod
1094+
def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTopicRequest:
1095+
add_consumers = []
1096+
if req.add_consumers:
1097+
for consumer in req.add_consumers:
1098+
if isinstance(consumer, str):
1099+
consumer = ydb_topic_public_types.PublicConsumer(name=consumer)
1100+
add_consumers.append(Consumer.from_public(consumer))
1101+
1102+
alter_consumers = []
1103+
if req.alter_consumers:
1104+
for consumer in req.alter_consumers:
1105+
if isinstance(consumer, str):
1106+
consumer = ydb_topic_public_types.PublicAlterConsumer(name=consumer)
1107+
alter_consumers.append(AlterConsumer.from_public(consumer))
1108+
1109+
drop_consumers = req.drop_consumers if req.drop_consumers else []
1110+
1111+
return AlterTopicRequest(
1112+
path=req.path,
1113+
alter_partitioning_settings=AlterPartitioningSettings(
1114+
set_min_active_partitions=req.set_min_active_partitions,
1115+
set_partition_count_limit=req.set_partition_count_limit,
1116+
),
1117+
add_consumers=add_consumers,
1118+
set_retention_period=req.set_retention_period,
1119+
set_retention_storage_mb=req.set_retention_storage_mb,
1120+
set_supported_codecs=SupportedCodecs.from_public(req.set_supported_codecs),
1121+
set_partition_write_burst_bytes=req.set_partition_write_burst_bytes,
1122+
set_partition_write_speed_bytes_per_second=req.set_partition_write_speed_bytes_per_second,
1123+
alter_attributes=req.alter_attributes,
1124+
alter_consumers=alter_consumers,
1125+
drop_consumers=drop_consumers,
1126+
set_metering_mode=MeteringMode.from_public(req.set_metering_mode),
1127+
)
1128+
1129+
9981130
@dataclass
9991131
class DescribeTopicRequest:
10001132
path: str

ydb/_grpc/grpcwrapper/ydb_topic_public_types.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,23 @@ class CreateTopicRequestParams:
3030
metering_mode: Optional["PublicMeteringMode"]
3131

3232

33+
@dataclass
34+
class AlterTopicRequestParams:
35+
path: str
36+
set_min_active_partitions: Optional[int]
37+
set_partition_count_limit: Optional[int]
38+
add_consumers: Optional[List[Union["PublicConsumer", str]]]
39+
alter_consumers: Optional[List[Union["PublicAlterConsumer", str]]]
40+
drop_consumers: Optional[List[str]]
41+
alter_attributes: Optional[Dict[str, str]]
42+
set_metering_mode: Optional["PublicMeteringMode"]
43+
set_partition_write_speed_bytes_per_second: Optional[int]
44+
set_partition_write_burst_bytes: Optional[int]
45+
set_retention_period: Optional[datetime.timedelta]
46+
set_retention_storage_mb: Optional[int]
47+
set_supported_codecs: Optional[List[Union["PublicCodec", int]]]
48+
49+
3350
class PublicCodec(int):
3451
"""
3552
Codec value may contain any int number.
@@ -73,6 +90,28 @@ class PublicConsumer:
7390
"Attributes of consumer"
7491

7592

93+
@dataclass
94+
class PublicAlterConsumer:
95+
name: str
96+
set_important: Optional[bool] = None
97+
"""
98+
Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
99+
User should take care that such consumer never stalls, to prevent running out of disk space.
100+
"""
101+
102+
set_read_from: Optional[datetime.datetime] = None
103+
"All messages with smaller server written_at timestamp will be skipped."
104+
105+
set_supported_codecs: Optional[List[PublicCodec]] = None
106+
"""
107+
List of supported codecs by this consumer.
108+
supported_codecs on topic must be contained inside this list.
109+
"""
110+
111+
alter_attributes: Optional[Dict[str, str]] = None
112+
"Attributes of consumer"
113+
114+
76115
@dataclass
77116
class DropTopicRequestParams(IToProto):
78117
path: str

ydb/_grpc/grpcwrapper/ydb_topic_test.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,15 @@
1+
import datetime
2+
3+
from google.protobuf.json_format import MessageToDict
4+
15
from ydb._grpc.grpcwrapper.ydb_topic import OffsetsRange
6+
from .ydb_topic import AlterTopicRequest
7+
from .ydb_topic_public_types import (
8+
AlterTopicRequestParams,
9+
PublicAlterConsumer,
10+
PublicConsumer,
11+
PublicCodec,
12+
)
213

314

415
def test_offsets_range_intersected():
@@ -17,3 +28,57 @@ def test_offsets_range_intersected():
1728
]:
1829
assert OffsetsRange(test[0], test[1]).is_intersected_with(OffsetsRange(test[2], test[3]))
1930
assert OffsetsRange(test[2], test[3]).is_intersected_with(OffsetsRange(test[0], test[1]))
31+
32+
33+
def test_alter_topic_request_from_public_to_proto():
34+
# Specify all fields with all possible input ways
35+
params = {
36+
"path": "topic_name",
37+
"add_consumers": [
38+
"new_consumer_1",
39+
PublicConsumer("new_consumer_2"),
40+
],
41+
"alter_consumers": [
42+
"old_consumer_1",
43+
PublicAlterConsumer("old_consumer_2"),
44+
],
45+
"drop_consumers": ["redundant_consumer"],
46+
"set_retention_period": datetime.timedelta(weeks=4),
47+
"set_retention_storage_mb": 4,
48+
"set_supported_codecs": [1, PublicCodec(2)],
49+
"set_partition_write_burst_bytes": 8,
50+
"set_partition_write_speed_bytes_per_second": 15,
51+
"alter_attributes": {"key": "value"},
52+
"set_metering_mode": 1,
53+
"set_min_active_partitions": 2,
54+
"set_partition_count_limit": 4,
55+
}
56+
57+
params_public = AlterTopicRequestParams(**params)
58+
request = AlterTopicRequest.from_public(params_public)
59+
request_proto = request.to_proto()
60+
61+
msg_dict = MessageToDict(request_proto, preserving_proto_field_name=True)
62+
63+
expected_dict = {
64+
"path": "topic_name",
65+
"alter_partitioning_settings": {"set_min_active_partitions": "2", "set_partition_count_limit": "4"},
66+
"set_retention_period": "2419200s",
67+
"set_retention_storage_mb": "4",
68+
"set_supported_codecs": {"codecs": [1, 2]},
69+
"set_partition_write_speed_bytes_per_second": "15",
70+
"set_partition_write_burst_bytes": "8",
71+
"alter_attributes": {"key": "value"},
72+
"add_consumers": [
73+
{"name": "new_consumer_1", "supported_codecs": {}},
74+
{"name": "new_consumer_2", "supported_codecs": {}},
75+
],
76+
"drop_consumers": ["redundant_consumer"],
77+
"alter_consumers": [
78+
{"name": "old_consumer_1"},
79+
{"name": "old_consumer_2"},
80+
],
81+
"set_metering_mode": "METERING_MODE_RESERVED_CAPACITY",
82+
}
83+
84+
assert msg_dict == expected_dict

0 commit comments

Comments
 (0)