Skip to content

Commit e8e1272

Browse files
committed
fix review comments
1 parent badb27a commit e8e1272

File tree

3 files changed

+67
-68
lines changed

3 files changed

+67
-68
lines changed

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 43 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
)
3434

3535

36-
class Codec(int, IToPublic):
36+
class Codec(int, IToPublic, IFromPublic):
3737
CODEC_UNSPECIFIED = 0
3838
CODEC_RAW = 1
3939
CODEC_GZIP = 2
@@ -47,9 +47,13 @@ def from_proto_iterable(codecs: typing.Iterable[int]) -> List["Codec"]:
4747
def to_public(self) -> ydb_topic_public_types.PublicCodec:
4848
return ydb_topic_public_types.PublicCodec(int(self))
4949

50+
@staticmethod
51+
def from_public(codec: Union[ydb_topic_public_types.PublicCodec, int]) -> "Codec":
52+
return Codec(int(codec))
53+
5054

5155
@dataclass
52-
class SupportedCodecs(IToProto, IFromProto, IToPublic):
56+
class SupportedCodecs(IToProto, IFromProto, IToPublic, IFromPublic):
5357
codecs: List[Codec]
5458

5559
def to_proto(self) -> ydb_topic_pb2.SupportedCodecs:
@@ -69,6 +73,15 @@ def from_proto(msg: Optional[ydb_topic_pb2.SupportedCodecs]) -> "SupportedCodecs
6973
def to_public(self) -> List[ydb_topic_public_types.PublicCodec]:
7074
return list(map(Codec.to_public, self.codecs))
7175

76+
@staticmethod
77+
def from_public(
78+
codecs: Optional[List[Union[ydb_topic_public_types.PublicCodec, int]]]
79+
) -> Optional["SupportedCodecs"]:
80+
if codecs is None:
81+
return None
82+
83+
return SupportedCodecs(codecs=[Codec.from_public(codec) for codec in codecs])
84+
7285

7386
@dataclass(order=True)
7487
class OffsetsRange(IFromProto, IToProto):
@@ -886,17 +899,21 @@ def from_proto(
886899
@dataclass
887900
class AlterConsumer(IToProto, IFromPublic):
888901
name: str
889-
set_important: bool
890-
set_read_from: datetime.datetime
891-
set_supported_codecs: SupportedCodecs
892-
alter_attributes: Dict[str, str]
902+
set_important: Optional[bool]
903+
set_read_from: Optional[datetime.datetime]
904+
set_supported_codecs: Optional[SupportedCodecs]
905+
alter_attributes: Optional[Dict[str, str]]
893906

894907
def to_proto(self) -> ydb_topic_pb2.AlterConsumer:
908+
supported_codecs = None
909+
if self.set_supported_codecs is not None:
910+
supported_codecs = self.set_supported_codecs.to_proto()
911+
895912
return ydb_topic_pb2.AlterConsumer(
896913
name=self.name,
897914
set_important=self.set_important,
898915
set_read_from=proto_timestamp_from_datetime(self.set_read_from),
899-
set_supported_codecs=self.set_supported_codecs.to_proto(),
916+
set_supported_codecs=supported_codecs,
900917
alter_attributes=self.alter_attributes,
901918
)
902919

@@ -905,13 +922,11 @@ def from_public(alter_consumer: ydb_topic_public_types.PublicAlterConsumer) -> A
905922
if not alter_consumer:
906923
return None
907924

908-
supported_codecs = alter_consumer.set_supported_codecs if alter_consumer.set_supported_codecs else []
909-
910925
return AlterConsumer(
911926
name=alter_consumer.name,
912927
set_important=alter_consumer.set_important,
913928
set_read_from=alter_consumer.set_read_from,
914-
set_supported_codecs=SupportedCodecs(codecs=supported_codecs),
929+
set_supported_codecs=SupportedCodecs.from_public(alter_consumer.set_supported_codecs),
915930
alter_attributes=alter_consumer.alter_attributes,
916931
)
917932

@@ -936,16 +951,9 @@ def to_proto(self) -> ydb_topic_pb2.PartitioningSettings:
936951

937952

938953
@dataclass
939-
class AlterPartitioningSettings(IToProto, IFromProto):
940-
set_min_active_partitions: int
941-
set_partition_count_limit: int
942-
943-
@staticmethod
944-
def from_proto(msg: ydb_topic_pb2.AlterPartitioningSettings) -> "AlterPartitioningSettings":
945-
return AlterPartitioningSettings(
946-
set_min_active_partitions=msg.set_min_active_partitions,
947-
set_partition_count_limit=msg.set_partition_count_limit,
948-
)
954+
class AlterPartitioningSettings(IToProto):
955+
set_min_active_partitions: Optional[int]
956+
set_partition_count_limit: Optional[int]
949957

950958
def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings:
951959
return ydb_topic_pb2.AlterPartitioningSettings(
@@ -1050,20 +1058,22 @@ class CreateTopicResult:
10501058
@dataclass
10511059
class AlterTopicRequest(IToProto, IFromPublic):
10521060
path: str
1053-
add_consumers: List["Consumer"]
1054-
alter_partitioning_settings: AlterPartitioningSettings
1055-
set_retention_period: datetime.timedelta
1056-
set_retention_storage_mb: int
1057-
set_supported_codecs: SupportedCodecs
1058-
set_partition_write_burst_bytes: typing.Optional[int]
1059-
set_partition_write_speed_bytes_per_second: typing.Optional[int]
1060-
alter_attributes: Dict[str, str]
1061-
alter_consumers: List[AlterConsumer]
1062-
drop_consumers: List[str]
1063-
set_metering_mode: "MeteringMode"
1061+
add_consumers: Optional[List["Consumer"]]
1062+
alter_partitioning_settings: Optional[AlterPartitioningSettings]
1063+
set_retention_period: Optional[datetime.timedelta]
1064+
set_retention_storage_mb: Optional[int]
1065+
set_supported_codecs: Optional[SupportedCodecs]
1066+
set_partition_write_burst_bytes: Optional[int]
1067+
set_partition_write_speed_bytes_per_second: Optional[int]
1068+
alter_attributes: Optional[Dict[str, str]]
1069+
alter_consumers: Optional[List[AlterConsumer]]
1070+
drop_consumers: Optional[List[str]]
1071+
set_metering_mode: Optional["MeteringMode"]
10641072

10651073
def to_proto(self) -> ydb_topic_pb2.AlterTopicRequest:
1066-
supported_codecs = self.set_supported_codecs.to_proto() if self.set_supported_codecs.codecs else None
1074+
supported_codecs = None
1075+
if self.set_supported_codecs is not None:
1076+
supported_codecs = self.set_supported_codecs.to_proto()
10671077

10681078
return ydb_topic_pb2.AlterTopicRequest(
10691079
path=self.path,
@@ -1098,8 +1108,6 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop
10981108

10991109
drop_consumers = req.drop_consumers if req.drop_consumers else []
11001110

1101-
supported_codecs = req.set_supported_codecs if req.set_supported_codecs else []
1102-
11031111
return AlterTopicRequest(
11041112
path=req.path,
11051113
alter_partitioning_settings=AlterPartitioningSettings(
@@ -1109,9 +1117,7 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop
11091117
add_consumers=add_consumers,
11101118
set_retention_period=req.set_retention_period,
11111119
set_retention_storage_mb=req.set_retention_storage_mb,
1112-
set_supported_codecs=SupportedCodecs(
1113-
codecs=supported_codecs,
1114-
),
1120+
set_supported_codecs=SupportedCodecs.from_public(req.set_supported_codecs),
11151121
set_partition_write_burst_bytes=req.set_partition_write_burst_bytes,
11161122
set_partition_write_speed_bytes_per_second=req.set_partition_write_speed_bytes_per_second,
11171123
alter_attributes=req.alter_attributes,
@@ -1121,11 +1127,6 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop
11211127
)
11221128

11231129

1124-
@dataclass
1125-
class AlterTopicResult:
1126-
pass
1127-
1128-
11291130
@dataclass
11301131
class DescribeTopicRequest:
11311132
path: str

ydb/_grpc/grpcwrapper/ydb_topic_public_types.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class PublicConsumer:
9393
@dataclass
9494
class PublicAlterConsumer:
9595
name: str
96-
set_important: bool = False
96+
set_important: Optional[bool] = None
9797
"""
9898
Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
9999
User should take care that such consumer never stalls, to prevent running out of disk space.
@@ -102,13 +102,13 @@ class PublicAlterConsumer:
102102
set_read_from: Optional[datetime.datetime] = None
103103
"All messages with smaller server written_at timestamp will be skipped."
104104

105-
set_supported_codecs: List[PublicCodec] = field(default_factory=lambda: list())
105+
set_supported_codecs: Optional[List[PublicCodec]] = None
106106
"""
107107
List of supported codecs by this consumer.
108108
supported_codecs on topic must be contained inside this list.
109109
"""
110110

111-
alter_attributes: Dict[str, str] = field(default_factory=lambda: dict())
111+
alter_attributes: Optional[Dict[str, str]] = None
112112
"Attributes of consumer"
113113

114114

ydb/_grpc/grpcwrapper/ydb_topic_test.py

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -60,27 +60,25 @@ def test_alter_topic_request_from_public_to_proto():
6060

6161
msg_dict = MessageToDict(request_proto, preserving_proto_field_name=True)
6262

63-
assert msg_dict["path"] == params["path"]
64-
assert len(msg_dict["add_consumers"]) == len(params["add_consumers"])
65-
assert len(msg_dict["alter_consumers"]) == len(params["alter_consumers"])
66-
assert len(msg_dict["drop_consumers"]) == len(params["drop_consumers"])
67-
assert msg_dict["alter_attributes"] == params["alter_attributes"]
68-
69-
assert (
70-
int(msg_dict["alter_partitioning_settings"]["set_min_active_partitions"]) == params["set_min_active_partitions"]
71-
)
72-
assert (
73-
int(msg_dict["alter_partitioning_settings"]["set_partition_count_limit"]) == params["set_partition_count_limit"]
74-
)
75-
76-
assert int(msg_dict["set_partition_write_burst_bytes"]) == params["set_partition_write_burst_bytes"]
77-
assert (
78-
int(msg_dict["set_partition_write_speed_bytes_per_second"])
79-
== params["set_partition_write_speed_bytes_per_second"]
80-
)
81-
assert msg_dict["set_retention_period"] == str(int(params["set_retention_period"].total_seconds())) + "s"
82-
assert int(msg_dict["set_retention_storage_mb"]) == params["set_retention_storage_mb"]
83-
84-
assert msg_dict["set_metering_mode"] == "METERING_MODE_RESERVED_CAPACITY"
63+
expected_dict = {
64+
"path": "topic_name",
65+
"alter_partitioning_settings": {"set_min_active_partitions": "2", "set_partition_count_limit": "4"},
66+
"set_retention_period": "2419200s",
67+
"set_retention_storage_mb": "4",
68+
"set_supported_codecs": {"codecs": [1, 2]},
69+
"set_partition_write_speed_bytes_per_second": "15",
70+
"set_partition_write_burst_bytes": "8",
71+
"alter_attributes": {"key": "value"},
72+
"add_consumers": [
73+
{"name": "new_consumer_1", "supported_codecs": {}},
74+
{"name": "new_consumer_2", "supported_codecs": {}},
75+
],
76+
"drop_consumers": ["redundant_consumer"],
77+
"alter_consumers": [
78+
{"name": "old_consumer_1"},
79+
{"name": "old_consumer_2"},
80+
],
81+
"set_metering_mode": "METERING_MODE_RESERVED_CAPACITY",
82+
}
8583

86-
assert msg_dict["set_supported_codecs"]["codecs"] == params["set_supported_codecs"]
84+
assert msg_dict == expected_dict

0 commit comments

Comments
 (0)