diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 34a4c89a0619..d4a238211114 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -96,6 +96,7 @@ ydb/tests/fq/multi_plane py3test.[test_retry.py] chunk ydb/tests/fq/multi_plane test_retry.py.TestRetry.test_low_rate[kikimr0] ydb/tests/fq/multi_plane test_retry_high_rate.py.TestRetry.test_high_rate[kikimr0] ydb/tests/fq/s3 test_bindings_0.py.TestBindings.test_modify_connection_with_a_lot_of_bindings[v2-kikimr_settings0-client0] +ydb/tests/fq/streaming test_streaming.py.TestStreamingInYdb.test_read_topic_restore_state ydb/tests/fq/streaming test_streaming.py.TestStreamingInYdb.test_read_topic_shared_reading_restart_nodes ydb/tests/fq/yds test_2_selects_limit.py.TestSelectLimit.test_select_same[v1] ydb/tests/fq/yds test_2_selects_limit.py.TestSelectLimit.test_select_sequence[v1] diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/constructors.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/constructors.h index d513880359d7..d344775e962b 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/constructors.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/constructors.h @@ -66,7 +66,7 @@ class TSourceConstructor: public ICursorEntity, public TMoveOnly { } bool operator()(const TSourceConstructor& l, const TSourceConstructor& r) const { - return r.Start < l.Start; + return std::make_pair(r.Start, r.GetSourceId()) < std::make_pair(l.Start, l.GetSourceId()); } }; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp index dad58f1e6053..8e350a4d2082 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp @@ -50,23 +50,29 @@ ISyncPoint::ESourceAction TSyncPointLimitControl::OnSourceReady( AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("Iterator", it.DebugString()); } for (auto it : DebugOrder) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("DebugOrder", it); + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("DebugOrder", it.DebugString()); + } + for (auto it : SourcesSequentially) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("SourcesSequentially", it->GetSourceId()); } if (FindIf(Iterators, [&](const auto& item) { return item.GetSourceId() == source->GetSourceId(); }) != Iterators.end()) { AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "portion is in heap") ("front", Iterators.front().DebugString()) + ("back", Iterators.back().DebugString()) ("source", source->GetAs()->GetStart().DebugString()) ("source_id", source->GetSourceId()); } - else if (Find(DebugOrder, source->GetSourceId()) != DebugOrder.end()) { + else if (FindIf(DebugOrder, [&](const auto& item) { return item.GetSourceId() == source->GetSourceId(); }) != DebugOrder.end()) { AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "known portion, not in heap") ("front", Iterators.front().DebugString()) + ("back", Iterators.back().DebugString()) ("source", source->GetAs()->GetStart().DebugString()) ("source_id", source->GetSourceId()); } else { AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "unknown portion") ("front", Iterators.front().DebugString()) + ("back", Iterators.back().DebugString()) ("source", source->GetAs()->GetStart().DebugString()) ("source_id", source->GetSourceId()); } @@ -112,6 +118,7 @@ TString TSyncPointLimitControl::TSourceIterator::DebugString() const { sb << "f=" << IsFilled() << ";"; sb << "record=" << SortableRecord->DebugJson() << ";"; sb << "start=" << Source->GetAs()->GetStart().DebugString() << ";"; + sb << "finish=" << Source->GetAs()->GetFinish().DebugString() << ";"; return sb; } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h index 9f7b6762ec4b..8c2cfd32c492 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h @@ -117,7 +117,7 @@ class TSyncPointLimitControl: public ISyncPoint { }; std::vector Iterators; - std::vector DebugOrder; + std::vector DebugOrder; virtual bool IsFinished() const override { return FetchedCount >= Limit || TBase::IsFinished(); @@ -126,7 +126,7 @@ class TSyncPointLimitControl: public ISyncPoint { virtual std::shared_ptr OnAddSource(const std::shared_ptr& source) override { AFL_VERIFY(FetchedCount < Limit); Iterators.emplace_back(TSourceIterator(source)); - DebugOrder.emplace_back(source->GetSourceId()); + DebugOrder.emplace_back(TSourceIterator(source)); std::push_heap(Iterators.begin(), Iterators.end()); return TBase::OnAddSource(source); }