Skip to content

Commit c370cbf

Browse files
authored
rd + watermarks: transfer single watermark per batch (backport #25209) (#25886)
1 parent 06f8c24 commit c370cbf

File tree

5 files changed

+86
-57
lines changed

5 files changed

+86
-57
lines changed

ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ NYT::TNode CreateTypeNode(NYT::TNode&& typeNode) {
2121
return CreateNamedNode("DataType", std::move(typeNode));
2222
}
2323

24+
NYT::TNode CreateOptionalTypeNode(NYT::TNode&& typeNode) {
25+
return CreateNamedNode("OptionalType", std::move(typeNode));
26+
}
27+
2428
NYT::TNode CreateStructTypeNode(NYT::TNode&& membersNode) {
2529
return CreateNamedNode("StructType", std::move(membersNode));
2630
}
@@ -70,7 +74,7 @@ NYT::TNode MakeWatermarkOutputSchema() {
7074
return CreateStructTypeNode(
7175
NYT::TNode::CreateList()
7276
.Add(CreateFieldNode(OFFSET_FIELD_NAME, CreateTypeNode("Uint64")))
73-
.Add(CreateFieldNode(WATERMARK_FIELD_NAME, CreateTypeNode("Timestamp")))
77+
.Add(CreateFieldNode(WATERMARK_FIELD_NAME, CreateOptionalTypeNode(CreateTypeNode("Timestamp"))))
7478
);
7579
}
7680

@@ -456,21 +460,10 @@ class TProgramRunHandler final : public IProgramRunHandler, public TNonCopyable
456460

457461
TStringBuilder sb;
458462
sb << R"(PRAGMA config.flags("LLVM", ")" << (settings.EnabledLLVM ? "ON" : "OFF") << R"(");)" << '\n';
459-
sb << "$input ="
460-
<< " SELECT "
461-
<< OFFSET_FIELD_NAME << ", "
462-
<< watermarkExpr << " AS " << WATERMARK_FIELD_NAME
463-
<< " FROM Input;\n";
464-
sb << "$output ="
465-
<< " SELECT "
466-
<< OFFSET_FIELD_NAME << ", "
467-
<< WATERMARK_FIELD_NAME
468-
<< " FROM $input"
469-
<< " WHERE " << WATERMARK_FIELD_NAME << " IS NOT NULL;\n";
470463
sb << "SELECT "
471464
<< OFFSET_FIELD_NAME << ", "
472-
<< "Unwrap(" << WATERMARK_FIELD_NAME << ") AS " << WATERMARK_FIELD_NAME
473-
<< " FROM $output;\n";
465+
<< watermarkExpr << " AS " << WATERMARK_FIELD_NAME
466+
<< " FROM Input;\n";
474467

475468
TString result = sb;
476469
LOG_ROW_DISPATCHER_DEBUG("Generated sql:\n" << result);

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -219,17 +219,31 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
219219
Client->StartClientSession();
220220
}
221221

222+
private:
223+
void OnWatermark(const NYql::NUdf::TUnboxedValue& rowIdValue, const NYql::NUdf::TUnboxedValue& maybeWatermark) {
224+
if (!maybeWatermark) {
225+
return;
226+
}
227+
auto rowId = rowIdValue.Get<ui64>();
228+
Offset = Self.Offsets->at(rowId);
229+
auto watermark = TInstant::MicroSeconds(maybeWatermark.Get<ui64>());
230+
if (Watermark < watermark) {
231+
Watermark = watermark;
232+
}
233+
LOG_ROW_DISPATCHER_TRACE("OnWatermark, row id: " << rowId << ", watermark: " << watermark);
234+
}
235+
236+
public:
222237
void OnData(const NYql::NUdf::TUnboxedValue* value) override {
223238
ui64 rowId;
224-
TMaybe<ui64> watermarkUs;
225239
if (value->IsEmbedded()) {
226240
rowId = value->Get<ui64>();
227241
} else if (value->IsBoxed()) {
228242
if (value->GetListLength() == 1) {
229243
rowId = value->GetElement(0).Get<ui64>();
230244
} else if (value->GetListLength() == 2) {
231-
rowId = value->GetElement(0).Get<ui64>();
232-
watermarkUs = value->GetElement(1).Get<ui64>();
245+
OnWatermark(value->GetElement(0), value->GetElement(1));
246+
return;
233247
} else {
234248
Y_ENSURE(false, "Unexpected output schema size");
235249
}
@@ -244,14 +258,6 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
244258
}
245259

246260
FilteredOffsets.insert(Offset);
247-
if (watermarkUs) {
248-
WatermarksUs.push_back(*watermarkUs);
249-
250-
const auto watermark = WatermarksUs.empty() ? Nothing() : TMaybe<TInstant>{TInstant::MicroSeconds(WatermarksUs.back())};
251-
LOG_ROW_DISPATCHER_TRACE("OnData, row id: " << rowId << ", offset: " << Offset << ", watermark: " << watermark);
252-
253-
return;
254-
}
255261

256262
Y_DEFER {
257263
// Values allocated on parser allocator and should be released
@@ -270,7 +276,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
270276
}
271277

272278
void OnBatchFinish() override {
273-
if (NewNumberRows == NumberRows && NewDataPackerSize == DataPackerSize && WatermarksUs.empty()) {
279+
if (NewNumberRows == NumberRows && NewDataPackerSize == DataPackerSize && !Watermark) {
274280
return;
275281
}
276282
if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && Offset < *nextOffset) {
@@ -280,11 +286,10 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
280286

281287
const auto numberRows = NewNumberRows - NumberRows;
282288
const auto rowSize = NewDataPackerSize - DataPackerSize;
283-
const auto watermark = WatermarksUs.empty() ? Nothing() : TMaybe<TInstant>{TInstant::MicroSeconds(WatermarksUs.back())};
284289

285-
LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << Offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << watermark);
290+
LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << Offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << Watermark);
286291

287-
Client->AddDataToClient(Offset, numberRows, rowSize, watermark);
292+
Client->AddDataToClient(Offset, numberRows, rowSize, Watermark);
288293

289294
NumberRows = NewNumberRows;
290295
DataPackerSize = NewDataPackerSize;
@@ -313,15 +318,18 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
313318
}
314319

315320
void FinishPacking() {
316-
if (!DataPacker->IsEmpty() || !WatermarksUs.empty()) {
321+
if (!DataPacker->IsEmpty() || !Watermark.Empty()) {
317322
LOG_ROW_DISPATCHER_TRACE("FinishPacking, batch size: " << DataPackerSize << ", number rows: " << FilteredOffsets.size());
318-
ClientData.emplace(NYql::MakeReadOnlyRope(DataPacker->Finish()), FilteredOffsets, WatermarksUs);
323+
if (FilteredOffsets.empty()) {
324+
FilteredOffsets.emplace(Offset);
325+
}
326+
ClientData.emplace(NYql::MakeReadOnlyRope(DataPacker->Finish()), std::move(FilteredOffsets), Watermark);
319327
NumberRows = 0;
320328
NewNumberRows = 0;
321329
DataPackerSize = 0;
322330
NewDataPackerSize = 0;
323331
FilteredOffsets.clear();
324-
WatermarksUs.clear();
332+
Watermark.Clear();
325333
}
326334
}
327335

@@ -343,7 +351,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
343351
TVector<NYql::NUdf::TUnboxedValue> FilteredRow; // Temporary value holder for DataPacket
344352
std::unique_ptr<NKikimr::NMiniKQL::TValuePackerTransport<true>> DataPacker;
345353
TSet<ui64> FilteredOffsets; // Offsets of current batch in DataPacker
346-
TVector<ui64> WatermarksUs;
354+
TMaybe<TInstant> Watermark;
347355
TQueue<TDataBatch> ClientData;
348356
};
349357

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class IClientDataConsumer : public TThrRefBase {
3838
struct TDataBatch {
3939
TRope SerializedData;
4040
TSet<ui64> Offsets;
41-
TVector<ui64> WatermarksUs;
41+
TMaybe<TInstant> Watermark;
4242
};
4343

4444
class ITopicFormatHandler : public TNonCopyable {

ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class TFormatHandlerFixture : public TBaseFixture {
1111
using TCallback = std::function<void(NActors::TActorId, TQueue<TDataBatch>&&)>;
1212
struct TMessages {
1313
TVector<ui64> Offsets;
14-
TVector<ui64> Watermark;
14+
TMaybe<TInstant> Watermark;
1515
TBatch Batch;
1616
};
1717

@@ -555,14 +555,14 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
555555
auto messages = TVector<TMessages>{
556556
{
557557
{firstOffset + 2, firstOffset + 3},
558-
{39'000'000, 40'000'000},
558+
TInstant::Seconds(40),
559559
TBatch()
560560
.AddRow(TRow().AddString("1970-01-01T00:00:44Z"))
561561
.AddRow(TRow().AddString("1970-01-01T00:00:45Z"))
562562
},
563563
{
564564
{firstOffset + 4, firstOffset + 5},
565-
{41'000'000, 42'000'000},
565+
TInstant::Seconds(42),
566566
TBatch()
567567
.AddRow(TRow().AddString("1970-01-01T00:00:46Z"))
568568
.AddRow(TRow().AddString("1970-01-01T00:00:47Z"))
@@ -584,18 +584,33 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
584584
messages = TVector<TMessages>{
585585
{
586586
{firstOffset + 4, firstOffset + 5},
587-
{41'000'000, 42'000'000},
587+
TInstant::Seconds(42),
588588
TBatch()
589589
.AddRow(TRow().AddString("1970-01-01T00:00:46Z"))
590590
.AddRow(TRow().AddString("1970-01-01T00:00:47Z"))
591591
},
592592
{
593593
{firstOffset + 6, firstOffset + 7},
594-
{43'000'000, 44'000'000},
594+
TInstant::Seconds(44),
595595
TBatch()
596596
.AddRow(TRow().AddString("1970-01-01T00:00:48Z"))
597597
.AddRow(TRow().AddString("1970-01-01T00:00:49Z"))
598598
},
599+
{
600+
{firstOffset + 60, firstOffset + 70},
601+
Nothing(),
602+
TBatch()
603+
.AddRow(TRow().AddString("1970-01-01T00:00:01Z")) // watermark = NULL
604+
.AddRow(TRow().AddString("1970-01-01T00:00:02Z")) // watermark = NULL
605+
},
606+
{
607+
{firstOffset + 600, firstOffset + 700, firstOffset + 800},
608+
TInstant::Seconds(0),
609+
TBatch()
610+
.AddRow(TRow().AddString("1970-01-01T00:00:03Z")) // watermark = NULL
611+
.AddRow(TRow().AddString("1970-01-01T00:00:05Z"))
612+
.AddRow(TRow().AddString("1970-01-01T00:00:04Z")) // watermark = NULL
613+
},
599614
};
600615
CheckSuccess(MakeClient(
601616
{{"ts", "[DataType; String]"}},
@@ -617,6 +632,17 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
617632
GetMessage(firstOffset + 7, R"({"ts": "1970-01-01T00:00:49Z"})"),
618633
});
619634

635+
ParseMessages({
636+
GetMessage(firstOffset + 60, R"({"ts": "1970-01-01T00:00:01Z"})"),
637+
GetMessage(firstOffset + 70, R"({"ts": "1970-01-01T00:00:02Z"})"),
638+
});
639+
640+
ParseMessages({
641+
GetMessage(firstOffset + 600, R"({"ts": "1970-01-01T00:00:03Z"})"),
642+
GetMessage(firstOffset + 700, R"({"ts": "1970-01-01T00:00:05Z"})"),
643+
GetMessage(firstOffset + 800, R"({"ts": "1970-01-01T00:00:04Z"})"),
644+
});
645+
620646
RemoveClient(ClientIds[1]);
621647

622648
ParseMessages({
@@ -635,14 +661,14 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
635661

636662
auto messages = TVector<TMessages>{
637663
{
638-
{firstOffset + 2, firstOffset + 3},
639-
{39'000'000, 40'000'000},
664+
{firstOffset + 2},
665+
TInstant::Seconds(40),
640666
TBatch()
641667
.AddRow(TRow().AddString("1970-01-01T00:00:44Z").AddUint64(1))
642668
},
643669
{
644-
{firstOffset + 4, firstOffset + 5},
645-
{41'000'000, 42'000'000},
670+
{firstOffset + 4},
671+
TInstant::Seconds(42),
646672
TBatch()
647673
.AddRow(TRow().AddString("1970-01-01T00:00:46Z").AddUint64(1))
648674
},
@@ -662,14 +688,14 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
662688

663689
messages = TVector<TMessages>{
664690
{
665-
{firstOffset + 4, firstOffset + 5},
666-
{41'000'000, 42'000'000},
691+
{firstOffset + 4},
692+
TInstant::Seconds(42),
667693
TBatch()
668694
.AddRow(TRow().AddString("1970-01-01T00:00:46Z").AddUint64(1))
669695
},
670696
{
671-
{firstOffset + 6, firstOffset + 7},
672-
{43'000'000, 44'000'000},
697+
{firstOffset + 6},
698+
TInstant::Seconds(44),
673699
TBatch()
674700
.AddRow(TRow().AddString("1970-01-01T00:00:48Z").AddUint64(1))
675701
},
@@ -712,13 +738,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
712738

713739
auto messages = TVector<TMessages>{
714740
{
715-
{firstOffset + 2, firstOffset + 3},
716-
{39'000'000, 40'000'000},
741+
{firstOffset + 3},
742+
TInstant::Seconds(40),
717743
TBatch()
718744
},
719745
{
720-
{firstOffset + 4, firstOffset + 5},
721-
{41'000'000, 42'000'000},
746+
{firstOffset + 5},
747+
TInstant::Seconds(42),
722748
TBatch()
723749
},
724750
};
@@ -737,13 +763,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
737763

738764
messages = TVector<TMessages>{
739765
{
740-
{firstOffset + 4, firstOffset + 5},
741-
{41'000'000, 42'000'000},
766+
{firstOffset + 5},
767+
TInstant::Seconds(42),
742768
TBatch()
743769
},
744770
{
745-
{firstOffset + 6, firstOffset + 7},
746-
{43'000'000, 44'000'000},
771+
{firstOffset + 7},
772+
TInstant::Seconds(44),
747773
TBatch()
748774
},
749775
};

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,7 @@ void TTopicSession::SendData(TClientsInfo& info) {
697697

698698
ui64 batchSize = 0;
699699
while (!buffer.empty()) {
700-
auto [serializedData, offsets, watermarksUs] = std::move(buffer.front());
700+
auto [serializedData, offsets, watermark] = std::move(buffer.front());
701701
Y_ENSURE(!offsets.empty(), "Expected non empty message batch");
702702
buffer.pop();
703703

@@ -706,7 +706,9 @@ void TTopicSession::SendData(TClientsInfo& info) {
706706
NFq::NRowDispatcherProto::TEvMessage message;
707707
message.SetPayloadId(event->AddPayload(std::move(serializedData)));
708708
message.MutableOffsets()->Assign(offsets.begin(), offsets.end());
709-
message.MutableWatermarksUs()->Assign(watermarksUs.begin(), watermarksUs.end());
709+
if (watermark) {
710+
message.AddWatermarksUs(watermark->MicroSeconds());
711+
}
710712
event->Record.AddMessages()->CopyFrom(std::move(message));
711713
event->Record.SetNextMessageOffset(*offsets.rbegin() + 1);
712714

0 commit comments

Comments
 (0)