Skip to content

Commit b81b4d7

Browse files
authored
Merge pull request #476 from Eistern/feature/alert-topic-auto-partitioning
Implement SDK support for Topic Auto-Partitioning Settings
2 parents f353a9c + 77f469b commit b81b4d7

9 files changed

+399
-8
lines changed

topic/src/main/java/tech/ydb/topic/description/ConsumerDescription.java

-1
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,4 @@ public Consumer getConsumer() {
2828
public List<ConsumerPartitionInfo> getPartitions() {
2929
return partitions;
3030
}
31-
3231
}

topic/src/main/java/tech/ydb/topic/description/TopicDescription.java

-1
Original file line numberDiff line numberDiff line change
@@ -179,5 +179,4 @@ public TopicDescription build() {
179179
return new TopicDescription(this);
180180
}
181181
}
182-
183182
}

topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java

+90-2
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,12 @@
3535
import tech.ydb.topic.read.SyncReader;
3636
import tech.ydb.topic.read.impl.AsyncReaderImpl;
3737
import tech.ydb.topic.read.impl.SyncReaderImpl;
38+
import tech.ydb.topic.settings.AlterAutoPartitioningWriteStrategySettings;
3839
import tech.ydb.topic.settings.AlterConsumerSettings;
3940
import tech.ydb.topic.settings.AlterPartitioningSettings;
4041
import tech.ydb.topic.settings.AlterTopicSettings;
42+
import tech.ydb.topic.settings.AutoPartitioningStrategy;
43+
import tech.ydb.topic.settings.AutoPartitioningWriteStrategySettings;
4144
import tech.ydb.topic.settings.CommitOffsetSettings;
4245
import tech.ydb.topic.settings.CreateTopicSettings;
4346
import tech.ydb.topic.settings.DescribeConsumerSettings;
@@ -104,7 +107,23 @@ public CompletableFuture<Status> createTopic(String path, CreateTopicSettings se
104107
if (partitioningSettings != null) {
105108
requestBuilder.setPartitioningSettings(YdbTopic.PartitioningSettings.newBuilder()
106109
.setMinActivePartitions(partitioningSettings.getMinActivePartitions())
107-
.setPartitionCountLimit(partitioningSettings.getPartitionCountLimit()));
110+
.setPartitionCountLimit(partitioningSettings.getPartitionCountLimit())
111+
.setAutoPartitioningSettings(YdbTopic.AutoPartitioningSettings.newBuilder()
112+
.setStrategy(toProto(partitioningSettings.getAutoPartitioningStrategy()))));
113+
114+
AutoPartitioningWriteStrategySettings writeStrategySettings = partitioningSettings
115+
.getWriteStrategySettings();
116+
117+
if (writeStrategySettings != null) {
118+
requestBuilder.getPartitioningSettingsBuilder().getAutoPartitioningSettingsBuilder()
119+
.setPartitionWriteSpeed(YdbTopic.AutoPartitioningWriteSpeedStrategy.newBuilder()
120+
.setStabilizationWindow(ProtobufUtils.durationToProto(
121+
writeStrategySettings.getStabilizationWindow()
122+
))
123+
.setDownUtilizationPercent(writeStrategySettings.getDownUtilizationPercent())
124+
.setUpUtilizationPercent(writeStrategySettings.getUpUtilizationPercent())
125+
);
126+
}
108127
}
109128

110129
Duration retentionPeriod = settings.getRetentionPeriod();
@@ -145,6 +164,30 @@ public CompletableFuture<Status> alterTopic(String path, AlterTopicSettings sett
145164
if (partitionCountLimit != null) {
146165
builder.setSetPartitionCountLimit(partitionCountLimit);
147166
}
167+
AutoPartitioningStrategy autoPartitioningStrategy = partitioningSettings.getAutoPartitioningStrategy();
168+
if (autoPartitioningStrategy != null) {
169+
YdbTopic.AutoPartitioningStrategy protoReference = toProto(autoPartitioningStrategy);
170+
builder.getAlterAutoPartitioningSettingsBuilder().setSetStrategy(protoReference);
171+
}
172+
AlterAutoPartitioningWriteStrategySettings writeStrategySettings = partitioningSettings
173+
.getWriteStrategySettings();
174+
if (writeStrategySettings != null) {
175+
Duration stabilizationWindow = writeStrategySettings.getStabilizationWindow();
176+
if (stabilizationWindow != null) {
177+
builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder()
178+
.setSetStabilizationWindow(ProtobufUtils.durationToProto(stabilizationWindow));
179+
}
180+
Integer upUtilizationPercent = writeStrategySettings.getUpUtilizationPercent();
181+
if (upUtilizationPercent != null) {
182+
builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder()
183+
.setSetUpUtilizationPercent(upUtilizationPercent);
184+
}
185+
Integer downUtilizationPercent = writeStrategySettings.getDownUtilizationPercent();
186+
if (downUtilizationPercent != null) {
187+
builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder()
188+
.setSetDownUtilizationPercent(downUtilizationPercent);
189+
}
190+
}
148191
requestBuilder.setAlterPartitioningSettings(builder);
149192
}
150193

@@ -273,11 +316,26 @@ private TopicDescription mapDescribeTopic(YdbTopic.DescribeTopicResult result) {
273316
.setMeteringMode(fromProto(result.getMeteringMode()));
274317

275318
YdbTopic.PartitioningSettings partitioningSettings = result.getPartitioningSettings();
276-
description.setPartitioningSettings(PartitioningSettings.newBuilder()
319+
YdbTopic.AutoPartitioningSettings autoPartitioningSettings = partitioningSettings.getAutoPartitioningSettings();
320+
YdbTopic.AutoPartitioningStrategy autoPartitioningStrategy = autoPartitioningSettings.getStrategy();
321+
322+
PartitioningSettings.Builder partitioningDescription = PartitioningSettings.newBuilder()
277323
.setMinActivePartitions(partitioningSettings.getMinActivePartitions())
278324
.setPartitionCountLimit(partitioningSettings.getPartitionCountLimit())
325+
.setAutoPartitioningStrategy(fromProto(autoPartitioningStrategy));
326+
327+
YdbTopic.AutoPartitioningWriteSpeedStrategy partitionWriteSpeed = autoPartitioningSettings
328+
.getPartitionWriteSpeed();
329+
partitioningDescription.setWriteStrategySettings(AutoPartitioningWriteStrategySettings.newBuilder()
330+
.setStabilizationWindow(ProtobufUtils.protoToDuration(
331+
partitionWriteSpeed.getStabilizationWindow()
332+
))
333+
.setUpUtilizationPercent(partitionWriteSpeed.getUpUtilizationPercent())
334+
.setDownUtilizationPercent(partitionWriteSpeed.getDownUtilizationPercent())
279335
.build());
280336

337+
description.setPartitioningSettings(partitioningDescription.build());
338+
281339
List<PartitionInfo> partitions = new ArrayList<>();
282340
for (YdbTopic.DescribeTopicResult.PartitionInfo partition : result.getPartitionsList()) {
283341
PartitionInfo.Builder partitionBuilder = PartitionInfo.newBuilder()
@@ -391,6 +449,36 @@ private static YdbTopic.SupportedCodecs toProto(SupportedCodecs supportedCodecs)
391449
return codecsBuilder.build();
392450
}
393451

452+
private static AutoPartitioningStrategy fromProto(YdbTopic.AutoPartitioningStrategy autoPartitioningStrategy) {
453+
switch (autoPartitioningStrategy) {
454+
case AUTO_PARTITIONING_STRATEGY_PAUSED:
455+
return AutoPartitioningStrategy.PAUSED;
456+
case AUTO_PARTITIONING_STRATEGY_SCALE_UP:
457+
return AutoPartitioningStrategy.SCALE_UP;
458+
case AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN:
459+
return AutoPartitioningStrategy.SCALE_UP_AND_DOWN;
460+
case AUTO_PARTITIONING_STRATEGY_DISABLED:
461+
return AutoPartitioningStrategy.DISABLED;
462+
default:
463+
return null;
464+
}
465+
}
466+
467+
private static YdbTopic.AutoPartitioningStrategy toProto(AutoPartitioningStrategy autoPartitioningStrategy) {
468+
switch (autoPartitioningStrategy) {
469+
case PAUSED:
470+
return YdbTopic.AutoPartitioningStrategy.AUTO_PARTITIONING_STRATEGY_PAUSED;
471+
case SCALE_UP:
472+
return YdbTopic.AutoPartitioningStrategy.AUTO_PARTITIONING_STRATEGY_SCALE_UP;
473+
case SCALE_UP_AND_DOWN:
474+
return YdbTopic.AutoPartitioningStrategy.AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN;
475+
case DISABLED:
476+
return YdbTopic.AutoPartitioningStrategy.AUTO_PARTITIONING_STRATEGY_DISABLED;
477+
default:
478+
throw new IllegalArgumentException("Unknown auto partitioning strategy: " + autoPartitioningStrategy);
479+
}
480+
}
481+
394482
@Override
395483
public void close() {
396484
logger.debug("TopicClientImpl.close() is called");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package tech.ydb.topic.settings;
2+
3+
import java.time.Duration;
4+
5+
import javax.annotation.Nullable;
6+
7+
public class AlterAutoPartitioningWriteStrategySettings {
8+
@Nullable
9+
private final Duration stabilizationWindow;
10+
@Nullable
11+
private final Integer upUtilizationPercent;
12+
@Nullable
13+
private final Integer downUtilizationPercent;
14+
15+
public AlterAutoPartitioningWriteStrategySettings(Builder builder) {
16+
this.stabilizationWindow = builder.stabilizationWindow;
17+
this.upUtilizationPercent = builder.upUtilizationPercent;
18+
this.downUtilizationPercent = builder.downUtilizationPercent;
19+
}
20+
21+
@Nullable
22+
public Duration getStabilizationWindow() {
23+
return stabilizationWindow;
24+
}
25+
26+
@Nullable
27+
public Integer getUpUtilizationPercent() {
28+
return upUtilizationPercent;
29+
}
30+
31+
@Nullable
32+
public Integer getDownUtilizationPercent() {
33+
return downUtilizationPercent;
34+
}
35+
36+
public static Builder newBuilder() {
37+
return new Builder();
38+
}
39+
40+
public static class Builder {
41+
private Duration stabilizationWindow = null;
42+
private Integer upUtilizationPercent = null;
43+
private Integer downUtilizationPercent = null;
44+
45+
/**
46+
* @param stabilizationWindow Duration used by the auto partitioning algorithm to define if the partition must be split.
47+
* Default value is 5 minutes.
48+
* @return strategy builder
49+
*/
50+
public Builder setStabilizationWindow(Duration stabilizationWindow) {
51+
this.stabilizationWindow = stabilizationWindow;
52+
return this;
53+
}
54+
55+
/**
56+
* @param upUtilizationPercent Upper level of partition quota utilization after which the partition should be split.
57+
* Default value is 90%.
58+
* @return strategy builder
59+
*/
60+
public Builder setUpUtilizationPercent(int upUtilizationPercent) {
61+
this.upUtilizationPercent = upUtilizationPercent;
62+
return this;
63+
}
64+
65+
/**
66+
* @param downUtilizationPercent Lower level of partition quota utilization
67+
* after which the partition should be merged with the other one.
68+
* Default value is 30%.
69+
* @return strategy builder
70+
*/
71+
public Builder setDownUtilizationPercent(int downUtilizationPercent) {
72+
this.downUtilizationPercent = downUtilizationPercent;
73+
return this;
74+
}
75+
76+
public AlterAutoPartitioningWriteStrategySettings build() {
77+
return new AlterAutoPartitioningWriteStrategySettings(this);
78+
}
79+
}
80+
}

topic/src/main/java/tech/ydb/topic/settings/AlterPartitioningSettings.java

+40
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,16 @@ public class AlterPartitioningSettings {
1010
private final Long minActivePartitions;
1111
@Nullable
1212
private final Long partitionCountLimit;
13+
@Nullable
14+
private final AutoPartitioningStrategy autoPartitioningStrategy;
15+
@Nullable
16+
private final AlterAutoPartitioningWriteStrategySettings writeStrategySettings;
1317

1418
private AlterPartitioningSettings(Builder builder) {
1519
this.minActivePartitions = builder.minActivePartitions;
1620
this.partitionCountLimit = builder.partitionCountLimit;
21+
this.autoPartitioningStrategy = builder.autoPartitioningStrategy;
22+
this.writeStrategySettings = builder.writeStrategySettings;
1723
}
1824

1925
public static Builder newBuilder() {
@@ -30,12 +36,24 @@ public Long getPartitionCountLimit() {
3036
return partitionCountLimit;
3137
}
3238

39+
@Nullable
40+
public AutoPartitioningStrategy getAutoPartitioningStrategy() {
41+
return autoPartitioningStrategy;
42+
}
43+
44+
@Nullable
45+
public AlterAutoPartitioningWriteStrategySettings getWriteStrategySettings() {
46+
return writeStrategySettings;
47+
}
48+
3349
/**
3450
* BUILDER
3551
*/
3652
public static class Builder {
3753
private Long minActivePartitions = null;
3854
private Long partitionCountLimit = null;
55+
private AutoPartitioningStrategy autoPartitioningStrategy = null;
56+
private AlterAutoPartitioningWriteStrategySettings writeStrategySettings = null;
3957

4058
/**
4159
* @param minActivePartitions minimum partition count auto merge would stop working at.
@@ -58,6 +76,28 @@ public Builder setPartitionCountLimit(long partitionCountLimit) {
5876
return this;
5977
}
6078

79+
/**
80+
* @param autoPartitioningStrategy Strategy for auto partitioning.
81+
* Auto partitioning is disabled by default.
82+
* @return settings builder
83+
* @see AutoPartitioningStrategy#DISABLED
84+
*/
85+
public Builder setAutoPartitioningStrategy(AutoPartitioningStrategy autoPartitioningStrategy) {
86+
this.autoPartitioningStrategy = autoPartitioningStrategy;
87+
return this;
88+
}
89+
90+
/**
91+
* @param writeStrategySettings Settings for auto partitioning write strategy.
92+
* Does not have any effect if auto partitioning is disabled.
93+
* See {@link AlterAutoPartitioningWriteStrategySettings} for defaults
94+
* @return settings builder
95+
*/
96+
public Builder setWriteStrategySettings(AlterAutoPartitioningWriteStrategySettings writeStrategySettings) {
97+
this.writeStrategySettings = writeStrategySettings;
98+
return this;
99+
}
100+
61101
public AlterPartitioningSettings build() {
62102
return new AlterPartitioningSettings(this);
63103
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package tech.ydb.topic.settings;
2+
3+
public enum AutoPartitioningStrategy {
4+
/**
5+
* The auto partitioning is disabled.
6+
* You cannot disable the auto partitioning after it has been enabled.
7+
* @see AutoPartitioningStrategy#PAUSED
8+
*/
9+
DISABLED,
10+
/**
11+
* The auto partitioning algorithm will increase the partition count depending on the load characteristics.
12+
* The auto partitioning algorithm will never decrease the number of partitions.
13+
* @see AlterAutoPartitioningWriteStrategySettings
14+
*/
15+
SCALE_UP,
16+
/**
17+
* The auto partitioning algorithm will both increase and decrease partitions count depending on the load characteristics.
18+
* @see AlterAutoPartitioningWriteStrategySettings
19+
*/
20+
SCALE_UP_AND_DOWN,
21+
/**
22+
* The auto partitioning is paused.
23+
*/
24+
PAUSED;
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package tech.ydb.topic.settings;
2+
3+
import java.time.Duration;
4+
5+
public class AutoPartitioningWriteStrategySettings {
6+
private final Duration stabilizationWindow;
7+
private final int upUtilizationPercent;
8+
private final int downUtilizationPercent;
9+
10+
public AutoPartitioningWriteStrategySettings(Builder builder) {
11+
this.stabilizationWindow = builder.stabilizationWindow;
12+
this.upUtilizationPercent = builder.upUtilizationPercent;
13+
this.downUtilizationPercent = builder.downUtilizationPercent;
14+
}
15+
16+
public Duration getStabilizationWindow() {
17+
return stabilizationWindow;
18+
}
19+
20+
public int getUpUtilizationPercent() {
21+
return upUtilizationPercent;
22+
}
23+
24+
public int getDownUtilizationPercent() {
25+
return downUtilizationPercent;
26+
}
27+
28+
public static Builder newBuilder() {
29+
return new Builder();
30+
}
31+
32+
public static class Builder {
33+
private Duration stabilizationWindow = Duration.ofMinutes(5);
34+
private int upUtilizationPercent = 90;
35+
private int downUtilizationPercent = 30;
36+
37+
/**
38+
* @param stabilizationWindow Duration used by the auto partitioning algorithm to define if the partition must be split.
39+
* Default value is 5 minutes.
40+
* @return strategy builder
41+
*/
42+
public Builder setStabilizationWindow(Duration stabilizationWindow) {
43+
this.stabilizationWindow = stabilizationWindow;
44+
return this;
45+
}
46+
47+
/**
48+
* @param upUtilizationPercent Upper level of partition quota utilization after which the partition should be split.
49+
* Default value is 90%.
50+
* @return strategy builder
51+
*/
52+
public Builder setUpUtilizationPercent(int upUtilizationPercent) {
53+
this.upUtilizationPercent = upUtilizationPercent;
54+
return this;
55+
}
56+
57+
/**
58+
* @param downUtilizationPercent Lower level of partition quota utilization
59+
* after which the partition should be merged with the other one.
60+
* Default value is 30%.
61+
* @return strategy builder
62+
*/
63+
public Builder setDownUtilizationPercent(int downUtilizationPercent) {
64+
this.downUtilizationPercent = downUtilizationPercent;
65+
return this;
66+
}
67+
68+
public AutoPartitioningWriteStrategySettings build() {
69+
return new AutoPartitioningWriteStrategySettings(this);
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)