Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class TSourceConstructor: public ICursorEntity, public TMoveOnly {
}

bool operator()(const TSourceConstructor& l, const TSourceConstructor& r) const {
return r.Start < l.Start;
return std::make_pair(r.Start, r.GetSourceId()) < std::make_pair(l.Start, l.GetSourceId());
}
};

Expand All @@ -75,7 +75,7 @@ class TPortionsSources: public NCommon::TSourcesConstructorWithAccessors<TSource
private:
using TBase = NCommon::TSourcesConstructorWithAccessors<TSourceConstructor>;
ui32 CurrentSourceIdx = 0;
std::vector<TInsertWriteId> Uncommitted;
std::vector<TInsertWriteId> Uncommitted;

virtual void DoFillReadStats(TReadStats& stats) const override {
ui64 compactedPortionsBytes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,29 @@ ISyncPoint::ESourceAction TSyncPointLimitControl::OnSourceReady(
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("Iterator", it.DebugString());
}
for (auto it : DebugOrder) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("DebugOrder", it);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("DebugOrder", it.DebugString());
}
for (auto it : SourcesSequentially) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("SourcesSequentially", it->GetSourceId());
}
if (FindIf(Iterators, [&](const auto& item) { return item.GetSourceId() == source->GetSourceId(); }) != Iterators.end()) {
AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "portion is in heap")
("front", Iterators.front().DebugString())
("back", Iterators.back().DebugString())
("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())
("source_id", source->GetSourceId());
}
else if (Find(DebugOrder, source->GetSourceId()) != DebugOrder.end()) {
else if (FindIf(DebugOrder, [&](const auto& item) { return item.GetSourceId() == source->GetSourceId(); }) != DebugOrder.end()) {
AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "known portion, not in heap")
("front", Iterators.front().DebugString())
("back", Iterators.back().DebugString())
("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())
("source_id", source->GetSourceId());
}
else {
AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "unknown portion")
("front", Iterators.front().DebugString())
("back", Iterators.back().DebugString())
("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())
("source_id", source->GetSourceId());
}
Expand Down Expand Up @@ -112,6 +118,7 @@ TString TSyncPointLimitControl::TSourceIterator::DebugString() const {
sb << "f=" << IsFilled() << ";";
sb << "record=" << SortableRecord->DebugJson() << ";";
sb << "start=" << Source->GetAs<TPortionDataSource>()->GetStart().DebugString() << ";";
sb << "finish=" << Source->GetAs<TPortionDataSource>()->GetFinish().DebugString() << ";";
return sb;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class TSyncPointLimitControl: public ISyncPoint {
};

std::vector<TSourceIterator> Iterators;
std::vector<ui64> DebugOrder;
std::vector<TSourceIterator> DebugOrder;

virtual bool IsFinished() const override {
return FetchedCount >= Limit || TBase::IsFinished();
Expand All @@ -126,7 +126,7 @@ class TSyncPointLimitControl: public ISyncPoint {
virtual std::shared_ptr<NCommon::IDataSource> OnAddSource(const std::shared_ptr<NCommon::IDataSource>& source) override {
AFL_VERIFY(FetchedCount < Limit);
Iterators.emplace_back(TSourceIterator(source));
DebugOrder.emplace_back(source->GetSourceId());
DebugOrder.emplace_back(TSourceIterator(source));
std::push_heap(Iterators.begin(), Iterators.end());
return TBase::OnAddSource(source);
}
Expand Down
Loading