Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ ydb/tests/fq/multi_plane py3test.[test_retry.py] chunk
ydb/tests/fq/multi_plane test_retry.py.TestRetry.test_low_rate[kikimr0]
ydb/tests/fq/multi_plane test_retry_high_rate.py.TestRetry.test_high_rate[kikimr0]
ydb/tests/fq/s3 test_bindings_0.py.TestBindings.test_modify_connection_with_a_lot_of_bindings[v2-kikimr_settings0-client0]
ydb/tests/fq/streaming test_streaming.py.TestStreamingInYdb.test_read_topic_restore_state
ydb/tests/fq/streaming test_streaming.py.TestStreamingInYdb.test_read_topic_shared_reading_restart_nodes
ydb/tests/fq/yds test_2_selects_limit.py.TestSelectLimit.test_select_same[v1]
ydb/tests/fq/yds test_2_selects_limit.py.TestSelectLimit.test_select_sequence[v1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class TSourceConstructor: public ICursorEntity, public TMoveOnly {
}

bool operator()(const TSourceConstructor& l, const TSourceConstructor& r) const {
return r.Start < l.Start;
return std::make_pair(r.Start, r.GetSourceId()) < std::make_pair(l.Start, l.GetSourceId());
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,29 @@ ISyncPoint::ESourceAction TSyncPointLimitControl::OnSourceReady(
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("Iterator", it.DebugString());
}
for (auto it : DebugOrder) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("DebugOrder", it);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("DebugOrder", it.DebugString());
}
for (auto it : SourcesSequentially) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("SourcesSequentially", it->GetSourceId());
}
if (FindIf(Iterators, [&](const auto& item) { return item.GetSourceId() == source->GetSourceId(); }) != Iterators.end()) {
AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "portion is in heap")
("front", Iterators.front().DebugString())
("back", Iterators.back().DebugString())
("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())
("source_id", source->GetSourceId());
}
else if (Find(DebugOrder, source->GetSourceId()) != DebugOrder.end()) {
else if (FindIf(DebugOrder, [&](const auto& item) { return item.GetSourceId() == source->GetSourceId(); }) != DebugOrder.end()) {
AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "known portion, not in heap")
("front", Iterators.front().DebugString())
("back", Iterators.back().DebugString())
("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())
("source_id", source->GetSourceId());
}
else {
AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "unknown portion")
("front", Iterators.front().DebugString())
("back", Iterators.back().DebugString())
("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())
("source_id", source->GetSourceId());
}
Expand Down Expand Up @@ -112,6 +118,7 @@ TString TSyncPointLimitControl::TSourceIterator::DebugString() const {
sb << "f=" << IsFilled() << ";";
sb << "record=" << SortableRecord->DebugJson() << ";";
sb << "start=" << Source->GetAs<TPortionDataSource>()->GetStart().DebugString() << ";";
sb << "finish=" << Source->GetAs<TPortionDataSource>()->GetFinish().DebugString() << ";";
return sb;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class TSyncPointLimitControl: public ISyncPoint {
};

std::vector<TSourceIterator> Iterators;
std::vector<ui64> DebugOrder;
std::vector<TSourceIterator> DebugOrder;

virtual bool IsFinished() const override {
return FetchedCount >= Limit || TBase::IsFinished();
Expand All @@ -126,7 +126,7 @@ class TSyncPointLimitControl: public ISyncPoint {
virtual std::shared_ptr<NCommon::IDataSource> OnAddSource(const std::shared_ptr<NCommon::IDataSource>& source) override {
AFL_VERIFY(FetchedCount < Limit);
Iterators.emplace_back(TSourceIterator(source));
DebugOrder.emplace_back(source->GetSourceId());
DebugOrder.emplace_back(TSourceIterator(source));
std::push_heap(Iterators.begin(), Iterators.end());
return TBase::OnAddSource(source);
}
Expand Down
Loading