Skip to content

Commit 76c49c8

Browse files
yumkamAPozdniakov
authored andcommitted
rd + watermarks: transfer single watermark per batch (backport ydb-platform#25209) (ydb-platform#25886)
1 parent 422a43c commit 76c49c8

File tree

5 files changed

+86
-57
lines changed

5 files changed

+86
-57
lines changed

ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ NYT::TNode CreateTypeNode(NYT::TNode&& typeNode) {
2121
return CreateNamedNode("DataType", std::move(typeNode));
2222
}
2323

24+
NYT::TNode CreateOptionalTypeNode(NYT::TNode&& typeNode) {
25+
return CreateNamedNode("OptionalType", std::move(typeNode));
26+
}
27+
2428
NYT::TNode CreateStructTypeNode(NYT::TNode&& membersNode) {
2529
return CreateNamedNode("StructType", std::move(membersNode));
2630
}
@@ -70,7 +74,7 @@ NYT::TNode MakeWatermarkOutputSchema() {
7074
return CreateStructTypeNode(
7175
NYT::TNode::CreateList()
7276
.Add(CreateFieldNode(OFFSET_FIELD_NAME, CreateTypeNode("Uint64")))
73-
.Add(CreateFieldNode(WATERMARK_FIELD_NAME, CreateTypeNode("Timestamp")))
77+
.Add(CreateFieldNode(WATERMARK_FIELD_NAME, CreateOptionalTypeNode(CreateTypeNode("Timestamp"))))
7478
);
7579
}
7680

@@ -456,21 +460,10 @@ class TProgramRunHandler final : public IProgramRunHandler, public TNonCopyable
456460

457461
TStringBuilder sb;
458462
sb << R"(PRAGMA config.flags("LLVM", ")" << (settings.EnabledLLVM ? "ON" : "OFF") << R"(");)" << '\n';
459-
sb << "$input ="
460-
<< " SELECT "
461-
<< OFFSET_FIELD_NAME << ", "
462-
<< watermarkExpr << " AS " << WATERMARK_FIELD_NAME
463-
<< " FROM Input;\n";
464-
sb << "$output ="
465-
<< " SELECT "
466-
<< OFFSET_FIELD_NAME << ", "
467-
<< WATERMARK_FIELD_NAME
468-
<< " FROM $input"
469-
<< " WHERE " << WATERMARK_FIELD_NAME << " IS NOT NULL;\n";
470463
sb << "SELECT "
471464
<< OFFSET_FIELD_NAME << ", "
472-
<< "Unwrap(" << WATERMARK_FIELD_NAME << ") AS " << WATERMARK_FIELD_NAME
473-
<< " FROM $output;\n";
465+
<< watermarkExpr << " AS " << WATERMARK_FIELD_NAME
466+
<< " FROM Input;\n";
474467

475468
TString result = sb;
476469
LOG_ROW_DISPATCHER_DEBUG("Generated sql:\n" << result);

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -221,17 +221,31 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
221221
Client->StartClientSession();
222222
}
223223

224+
private:
225+
void OnWatermark(const NYql::NUdf::TUnboxedValue& rowIdValue, const NYql::NUdf::TUnboxedValue& maybeWatermark) {
226+
if (!maybeWatermark) {
227+
return;
228+
}
229+
auto rowId = rowIdValue.Get<ui64>();
230+
Offset = Self.Offsets->at(rowId);
231+
auto watermark = TInstant::MicroSeconds(maybeWatermark.Get<ui64>());
232+
if (Watermark < watermark) {
233+
Watermark = watermark;
234+
}
235+
LOG_ROW_DISPATCHER_TRACE("OnWatermark, row id: " << rowId << ", watermark: " << watermark);
236+
}
237+
238+
public:
224239
void OnData(const NYql::NUdf::TUnboxedValue* value) override {
225240
ui64 rowId;
226-
TMaybe<ui64> watermarkUs;
227241
if (value->IsEmbedded()) {
228242
rowId = value->Get<ui64>();
229243
} else if (value->IsBoxed()) {
230244
if (value->GetListLength() == 1) {
231245
rowId = value->GetElement(0).Get<ui64>();
232246
} else if (value->GetListLength() == 2) {
233-
rowId = value->GetElement(0).Get<ui64>();
234-
watermarkUs = value->GetElement(1).Get<ui64>();
247+
OnWatermark(value->GetElement(0), value->GetElement(1));
248+
return;
235249
} else {
236250
Y_ENSURE(false, "Unexpected output schema size");
237251
}
@@ -246,14 +260,6 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
246260
}
247261

248262
FilteredOffsets.insert(Offset);
249-
if (watermarkUs) {
250-
WatermarksUs.push_back(*watermarkUs);
251-
252-
const auto watermark = WatermarksUs.empty() ? Nothing() : TMaybe<TInstant>{TInstant::MicroSeconds(WatermarksUs.back())};
253-
LOG_ROW_DISPATCHER_TRACE("OnData, row id: " << rowId << ", offset: " << Offset << ", watermark: " << watermark);
254-
255-
return;
256-
}
257263

258264
Y_DEFER {
259265
// Values allocated on parser allocator and should be released
@@ -272,7 +278,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
272278
}
273279

274280
void OnBatchFinish() override {
275-
if (NewNumberRows == NumberRows && NewDataPackerSize == DataPackerSize && WatermarksUs.empty()) {
281+
if (NewNumberRows == NumberRows && NewDataPackerSize == DataPackerSize && !Watermark) {
276282
return;
277283
}
278284
if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && Offset < *nextOffset) {
@@ -282,11 +288,10 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
282288

283289
const auto numberRows = NewNumberRows - NumberRows;
284290
const auto rowSize = NewDataPackerSize - DataPackerSize;
285-
const auto watermark = WatermarksUs.empty() ? Nothing() : TMaybe<TInstant>{TInstant::MicroSeconds(WatermarksUs.back())};
286291

287-
LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << Offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << watermark);
292+
LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << Offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << Watermark);
288293

289-
Client->AddDataToClient(Offset, numberRows, rowSize, watermark);
294+
Client->AddDataToClient(Offset, numberRows, rowSize, Watermark);
290295

291296
NumberRows = NewNumberRows;
292297
DataPackerSize = NewDataPackerSize;
@@ -315,15 +320,18 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
315320
}
316321

317322
void FinishPacking() {
318-
if (!DataPacker->IsEmpty() || !WatermarksUs.empty()) {
323+
if (!DataPacker->IsEmpty() || !Watermark.Empty()) {
319324
LOG_ROW_DISPATCHER_TRACE("FinishPacking, batch size: " << DataPackerSize << ", number rows: " << FilteredOffsets.size());
320-
ClientData.emplace(NYql::MakeReadOnlyRope(DataPacker->Finish()), FilteredOffsets, WatermarksUs);
325+
if (FilteredOffsets.empty()) {
326+
FilteredOffsets.emplace(Offset);
327+
}
328+
ClientData.emplace(NYql::MakeReadOnlyRope(DataPacker->Finish()), std::move(FilteredOffsets), Watermark);
321329
NumberRows = 0;
322330
NewNumberRows = 0;
323331
DataPackerSize = 0;
324332
NewDataPackerSize = 0;
325333
FilteredOffsets.clear();
326-
WatermarksUs.clear();
334+
Watermark.Clear();
327335
}
328336
}
329337

@@ -345,7 +353,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
345353
TVector<NYql::NUdf::TUnboxedValue> FilteredRow; // Temporary value holder for DataPacket
346354
std::unique_ptr<NKikimr::NMiniKQL::TValuePackerTransport<true>> DataPacker;
347355
TSet<ui64> FilteredOffsets; // Offsets of current batch in DataPacker
348-
TVector<ui64> WatermarksUs;
356+
TMaybe<TInstant> Watermark;
349357
TQueue<TDataBatch> ClientData;
350358
};
351359

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class IClientDataConsumer : public TThrRefBase {
3838
struct TDataBatch {
3939
TRope SerializedData;
4040
TSet<ui64> Offsets;
41-
TVector<ui64> WatermarksUs;
41+
TMaybe<TInstant> Watermark;
4242
};
4343

4444
class ITopicFormatHandler : public TNonCopyable {

ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class TFormatHandlerFixture : public TBaseFixture {
1111
using TCallback = std::function<void(NActors::TActorId, TQueue<TDataBatch>&&)>;
1212
struct TMessages {
1313
TVector<ui64> Offsets;
14-
TVector<ui64> Watermark;
14+
TMaybe<TInstant> Watermark;
1515
TBatch Batch;
1616
};
1717

@@ -555,14 +555,14 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
555555
auto messages = TVector<TMessages>{
556556
{
557557
{firstOffset + 2, firstOffset + 3},
558-
{39'000'000, 40'000'000},
558+
TInstant::Seconds(40),
559559
TBatch()
560560
.AddRow(TRow().AddString("1970-01-01T00:00:44Z"))
561561
.AddRow(TRow().AddString("1970-01-01T00:00:45Z"))
562562
},
563563
{
564564
{firstOffset + 4, firstOffset + 5},
565-
{41'000'000, 42'000'000},
565+
TInstant::Seconds(42),
566566
TBatch()
567567
.AddRow(TRow().AddString("1970-01-01T00:00:46Z"))
568568
.AddRow(TRow().AddString("1970-01-01T00:00:47Z"))
@@ -584,18 +584,33 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
584584
messages = TVector<TMessages>{
585585
{
586586
{firstOffset + 4, firstOffset + 5},
587-
{41'000'000, 42'000'000},
587+
TInstant::Seconds(42),
588588
TBatch()
589589
.AddRow(TRow().AddString("1970-01-01T00:00:46Z"))
590590
.AddRow(TRow().AddString("1970-01-01T00:00:47Z"))
591591
},
592592
{
593593
{firstOffset + 6, firstOffset + 7},
594-
{43'000'000, 44'000'000},
594+
TInstant::Seconds(44),
595595
TBatch()
596596
.AddRow(TRow().AddString("1970-01-01T00:00:48Z"))
597597
.AddRow(TRow().AddString("1970-01-01T00:00:49Z"))
598598
},
599+
{
600+
{firstOffset + 60, firstOffset + 70},
601+
Nothing(),
602+
TBatch()
603+
.AddRow(TRow().AddString("1970-01-01T00:00:01Z")) // watermark = NULL
604+
.AddRow(TRow().AddString("1970-01-01T00:00:02Z")) // watermark = NULL
605+
},
606+
{
607+
{firstOffset + 600, firstOffset + 700, firstOffset + 800},
608+
TInstant::Seconds(0),
609+
TBatch()
610+
.AddRow(TRow().AddString("1970-01-01T00:00:03Z")) // watermark = NULL
611+
.AddRow(TRow().AddString("1970-01-01T00:00:05Z"))
612+
.AddRow(TRow().AddString("1970-01-01T00:00:04Z")) // watermark = NULL
613+
},
599614
};
600615
CheckSuccess(MakeClient(
601616
{{"ts", "[DataType; String]"}},
@@ -617,6 +632,17 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
617632
GetMessage(firstOffset + 7, R"({"ts": "1970-01-01T00:00:49Z"})"),
618633
});
619634

635+
ParseMessages({
636+
GetMessage(firstOffset + 60, R"({"ts": "1970-01-01T00:00:01Z"})"),
637+
GetMessage(firstOffset + 70, R"({"ts": "1970-01-01T00:00:02Z"})"),
638+
});
639+
640+
ParseMessages({
641+
GetMessage(firstOffset + 600, R"({"ts": "1970-01-01T00:00:03Z"})"),
642+
GetMessage(firstOffset + 700, R"({"ts": "1970-01-01T00:00:05Z"})"),
643+
GetMessage(firstOffset + 800, R"({"ts": "1970-01-01T00:00:04Z"})"),
644+
});
645+
620646
RemoveClient(ClientIds[1]);
621647

622648
ParseMessages({
@@ -635,14 +661,14 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
635661

636662
auto messages = TVector<TMessages>{
637663
{
638-
{firstOffset + 2, firstOffset + 3},
639-
{39'000'000, 40'000'000},
664+
{firstOffset + 2},
665+
TInstant::Seconds(40),
640666
TBatch()
641667
.AddRow(TRow().AddString("1970-01-01T00:00:44Z").AddUint64(1))
642668
},
643669
{
644-
{firstOffset + 4, firstOffset + 5},
645-
{41'000'000, 42'000'000},
670+
{firstOffset + 4},
671+
TInstant::Seconds(42),
646672
TBatch()
647673
.AddRow(TRow().AddString("1970-01-01T00:00:46Z").AddUint64(1))
648674
},
@@ -662,14 +688,14 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
662688

663689
messages = TVector<TMessages>{
664690
{
665-
{firstOffset + 4, firstOffset + 5},
666-
{41'000'000, 42'000'000},
691+
{firstOffset + 4},
692+
TInstant::Seconds(42),
667693
TBatch()
668694
.AddRow(TRow().AddString("1970-01-01T00:00:46Z").AddUint64(1))
669695
},
670696
{
671-
{firstOffset + 6, firstOffset + 7},
672-
{43'000'000, 44'000'000},
697+
{firstOffset + 6},
698+
TInstant::Seconds(44),
673699
TBatch()
674700
.AddRow(TRow().AddString("1970-01-01T00:00:48Z").AddUint64(1))
675701
},
@@ -712,13 +738,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
712738

713739
auto messages = TVector<TMessages>{
714740
{
715-
{firstOffset + 2, firstOffset + 3},
716-
{39'000'000, 40'000'000},
741+
{firstOffset + 3},
742+
TInstant::Seconds(40),
717743
TBatch()
718744
},
719745
{
720-
{firstOffset + 4, firstOffset + 5},
721-
{41'000'000, 42'000'000},
746+
{firstOffset + 5},
747+
TInstant::Seconds(42),
722748
TBatch()
723749
},
724750
};
@@ -737,13 +763,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
737763

738764
messages = TVector<TMessages>{
739765
{
740-
{firstOffset + 4, firstOffset + 5},
741-
{41'000'000, 42'000'000},
766+
{firstOffset + 5},
767+
TInstant::Seconds(42),
742768
TBatch()
743769
},
744770
{
745-
{firstOffset + 6, firstOffset + 7},
746-
{43'000'000, 44'000'000},
771+
{firstOffset + 7},
772+
TInstant::Seconds(44),
747773
TBatch()
748774
},
749775
};

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,7 @@ void TTopicSession::SendData(TClientsInfo& info) {
697697

698698
ui64 batchSize = 0;
699699
while (!buffer.empty()) {
700-
auto [serializedData, offsets, watermarksUs] = std::move(buffer.front());
700+
auto [serializedData, offsets, watermark] = std::move(buffer.front());
701701
Y_ENSURE(!offsets.empty(), "Expected non empty message batch");
702702
buffer.pop();
703703

@@ -706,7 +706,9 @@ void TTopicSession::SendData(TClientsInfo& info) {
706706
NFq::NRowDispatcherProto::TEvMessage message;
707707
message.SetPayloadId(event->AddPayload(std::move(serializedData)));
708708
message.MutableOffsets()->Assign(offsets.begin(), offsets.end());
709-
message.MutableWatermarksUs()->Assign(watermarksUs.begin(), watermarksUs.end());
709+
if (watermark) {
710+
message.AddWatermarksUs(watermark->MicroSeconds());
711+
}
710712
event->Record.AddMessages()->CopyFrom(std::move(message));
711713
event->Record.SetNextMessageOffset(*offsets.rbegin() + 1);
712714

0 commit comments

Comments
 (0)