Skip to content

Commit 18ae5ce

Browse files
authored
fixed MaxCommittedTimeLag (#27553)
1 parent ab58703 commit 18ae5ce

File tree

2 files changed

+18
-18
lines changed

2 files changed

+18
-18
lines changed

ydb/core/persqueue/pqtablet/partition/partition.cpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -817,9 +817,6 @@ TConsumerSnapshot TPartition::CreateSnapshot(TUserInfo& userInfo) const {
817817
if (userInfo.Offset >= static_cast<i64>(GetEndOffset())) {
818818
result.LastCommittedMessage.CreateTimestamp = now;
819819
result.LastCommittedMessage.WriteTimestamp = now;
820-
} else if (userInfo.ActualTimestamps) {
821-
result.LastCommittedMessage.CreateTimestamp = userInfo.CreateTimestamp;
822-
result.LastCommittedMessage.WriteTimestamp = userInfo.WriteTimestamp;
823820
} else {
824821
auto timestamp = GetWriteTimeEstimate(userInfo.Offset);
825822
result.LastCommittedMessage.CreateTimestamp = timestamp;
@@ -841,14 +838,14 @@ TConsumerSnapshot TPartition::CreateSnapshot(TUserInfo& userInfo) const {
841838
result.LastReadMessage.WriteTimestamp = userInfo.ReadWriteTimestamp;
842839
} else {
843840
auto timestamp = GetWriteTimeEstimate(readOffset);
844-
result.LastCommittedMessage.CreateTimestamp = timestamp;
845-
result.LastCommittedMessage.WriteTimestamp = timestamp;
841+
result.LastReadMessage.CreateTimestamp = timestamp;
842+
result.LastReadMessage.WriteTimestamp = timestamp;
846843
}
847844

848845
if (readOffset < (i64)GetEndOffset()) {
849846
result.ReadLag = result.LastReadTimestamp - result.LastReadMessage.WriteTimestamp;
850847
}
851-
result.CommitedLag = result.LastCommittedMessage.WriteTimestamp - now;
848+
result.CommitedLag = now - result.LastCommittedMessage.WriteTimestamp;
852849
result.TotalLag = TDuration::MilliSeconds(userInfo.GetWriteLagMs()) + result.ReadLag + (now - result.LastReadTimestamp);
853850

854851
return result;

ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
5555
session->Close(TDuration::Seconds(5));
5656
};
5757

58-
// Check describe for empty topic
58+
Cerr << ">>>>> Check describe for empty topic\n";
5959
{
6060
auto d = describe();
6161
UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
@@ -75,44 +75,47 @@ Y_UNIT_TEST_SUITE(WithSDK) {
7575
}
7676

7777
write(3);
78+
Sleep(TDuration::Seconds(2));
7879
write(7);
80+
Sleep(TDuration::Seconds(2));
81+
write(11);
7982

80-
// Check describe for topic which contains messages, but consumer hasn`t read
83+
Cerr << ">>>>> Check describe for topic which contains messages, but consumer hasn`t read\n";
8184
{
8285
auto d = describe();
8386
UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
8487
UNIT_ASSERT_VALUES_EQUAL(1, d.GetPartitions().size());
8588
auto& p = d.GetPartitions()[0];
8689
UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());
8790
UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive());
88-
UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset());
91+
UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset());
8992
auto& c = p.GetPartitionConsumerStats();
9093
UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());
9194
UNIT_ASSERT_VALUES_EQUAL(0, c->GetCommittedOffset());
9295
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag()); //
9396
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
94-
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag());
97+
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());
9598
UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero?
9699
UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset());
97100
}
98101

99102
UNIT_ASSERT(setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1).IsSuccess());
100103

101-
// Check describe for topic whis contains messages, has commited offset but hasn`t read (restart tablet for example)
104+
Cerr << ">>>>> Check describe for topic whis contains messages, has commited offset but hasn`t read (restart tablet for example)\n";
102105
{
103106
auto d = describe();
104107
UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
105108
UNIT_ASSERT_VALUES_EQUAL(1, d.GetPartitions().size());
106109
auto& p = d.GetPartitions()[0];
107110
UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());
108111
UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive());
109-
UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset());
112+
UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset());
110113
auto& c = p.GetPartitionConsumerStats();
111114
UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());
112115
UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset());
113116
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
114117
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
115-
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag());
118+
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());
116119
UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero?
117120
UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset());
118121
}
@@ -143,23 +146,23 @@ Y_UNIT_TEST_SUITE(WithSDK) {
143146
session->Close(TDuration::Seconds(1));
144147
}
145148

146-
// Check describe for topic wich contains messages, has commited offset of first message and read second message
149+
Cerr << ">>>>> Check describe for topic wich contains messages, has commited offset of first message and read second message\n";
147150
{
148151
auto d = describe();
149152
UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
150153
UNIT_ASSERT_VALUES_EQUAL(1, d.GetPartitions().size());
151154
auto& p = d.GetPartitions()[0];
152155
UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());
153156
UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive());
154-
UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset());
157+
UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset());
155158
auto& c = p.GetPartitionConsumerStats();
156159
UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());
157160
UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset());
158-
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
161+
//UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
159162
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
160-
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag());
163+
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());
161164
UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3));
162-
UNIT_ASSERT_VALUES_EQUAL(2, c->GetLastReadOffset());
165+
UNIT_ASSERT_VALUES_EQUAL(3, c->GetLastReadOffset());
163166
}
164167
}
165168

0 commit comments

Comments
 (0)