Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ NYT::TNode CreateTypeNode(NYT::TNode&& typeNode) {
return CreateNamedNode("DataType", std::move(typeNode));
}

NYT::TNode CreateOptionalTypeNode(NYT::TNode&& typeNode) {
return CreateNamedNode("OptionalType", std::move(typeNode));
}

NYT::TNode CreateStructTypeNode(NYT::TNode&& membersNode) {
return CreateNamedNode("StructType", std::move(membersNode));
}
Expand Down Expand Up @@ -70,7 +74,7 @@ NYT::TNode MakeWatermarkOutputSchema() {
return CreateStructTypeNode(
NYT::TNode::CreateList()
.Add(CreateFieldNode(OFFSET_FIELD_NAME, CreateTypeNode("Uint64")))
.Add(CreateFieldNode(WATERMARK_FIELD_NAME, CreateTypeNode("Timestamp")))
.Add(CreateFieldNode(WATERMARK_FIELD_NAME, CreateOptionalTypeNode(CreateTypeNode("Timestamp"))))
);
}

Expand Down Expand Up @@ -456,21 +460,10 @@ class TProgramRunHandler final : public IProgramRunHandler, public TNonCopyable

TStringBuilder sb;
sb << R"(PRAGMA config.flags("LLVM", ")" << (settings.EnabledLLVM ? "ON" : "OFF") << R"(");)" << '\n';
sb << "$input ="
<< " SELECT "
<< OFFSET_FIELD_NAME << ", "
<< watermarkExpr << " AS " << WATERMARK_FIELD_NAME
<< " FROM Input;\n";
sb << "$output ="
<< " SELECT "
<< OFFSET_FIELD_NAME << ", "
<< WATERMARK_FIELD_NAME
<< " FROM $input"
<< " WHERE " << WATERMARK_FIELD_NAME << " IS NOT NULL;\n";
sb << "SELECT "
<< OFFSET_FIELD_NAME << ", "
<< "Unwrap(" << WATERMARK_FIELD_NAME << ") AS " << WATERMARK_FIELD_NAME
<< " FROM $output;\n";
<< watermarkExpr << " AS " << WATERMARK_FIELD_NAME
<< " FROM Input;\n";

TString result = sb;
LOG_ROW_DISPATCHER_DEBUG("Generated sql:\n" << result);
Expand Down
46 changes: 27 additions & 19 deletions ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,31 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
Client->StartClientSession();
}

private:
void OnWatermark(const NYql::NUdf::TUnboxedValue& rowIdValue, const NYql::NUdf::TUnboxedValue& maybeWatermark) {
if (!maybeWatermark) {
return;
}
auto rowId = rowIdValue.Get<ui64>();
Offset = Self.Offsets->at(rowId);
auto watermark = TInstant::MicroSeconds(maybeWatermark.Get<ui64>());
if (Watermark < watermark) {
Watermark = watermark;
}
LOG_ROW_DISPATCHER_TRACE("OnWatermark, row id: " << rowId << ", watermark: " << watermark);
}

public:
void OnData(const NYql::NUdf::TUnboxedValue* value) override {
ui64 rowId;
TMaybe<ui64> watermarkUs;
if (value->IsEmbedded()) {
rowId = value->Get<ui64>();
} else if (value->IsBoxed()) {
if (value->GetListLength() == 1) {
rowId = value->GetElement(0).Get<ui64>();
} else if (value->GetListLength() == 2) {
rowId = value->GetElement(0).Get<ui64>();
watermarkUs = value->GetElement(1).Get<ui64>();
OnWatermark(value->GetElement(0), value->GetElement(1));
return;
} else {
Y_ENSURE(false, "Unexpected output schema size");
}
Expand All @@ -244,14 +258,6 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

FilteredOffsets.insert(Offset);
if (watermarkUs) {
WatermarksUs.push_back(*watermarkUs);

const auto watermark = WatermarksUs.empty() ? Nothing() : TMaybe<TInstant>{TInstant::MicroSeconds(WatermarksUs.back())};
LOG_ROW_DISPATCHER_TRACE("OnData, row id: " << rowId << ", offset: " << Offset << ", watermark: " << watermark);

return;
}

Y_DEFER {
// Values allocated on parser allocator and should be released
Expand All @@ -270,7 +276,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

void OnBatchFinish() override {
if (NewNumberRows == NumberRows && NewDataPackerSize == DataPackerSize && WatermarksUs.empty()) {
if (NewNumberRows == NumberRows && NewDataPackerSize == DataPackerSize && !Watermark) {
return;
}
if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && Offset < *nextOffset) {
Expand All @@ -280,11 +286,10 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public

const auto numberRows = NewNumberRows - NumberRows;
const auto rowSize = NewDataPackerSize - DataPackerSize;
const auto watermark = WatermarksUs.empty() ? Nothing() : TMaybe<TInstant>{TInstant::MicroSeconds(WatermarksUs.back())};

LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << Offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << watermark);
LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << Offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << Watermark);

Client->AddDataToClient(Offset, numberRows, rowSize, watermark);
Client->AddDataToClient(Offset, numberRows, rowSize, Watermark);

NumberRows = NewNumberRows;
DataPackerSize = NewDataPackerSize;
Expand Down Expand Up @@ -313,15 +318,18 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

void FinishPacking() {
if (!DataPacker->IsEmpty() || !WatermarksUs.empty()) {
if (!DataPacker->IsEmpty() || !Watermark.Empty()) {
LOG_ROW_DISPATCHER_TRACE("FinishPacking, batch size: " << DataPackerSize << ", number rows: " << FilteredOffsets.size());
ClientData.emplace(NYql::MakeReadOnlyRope(DataPacker->Finish()), FilteredOffsets, WatermarksUs);
if (FilteredOffsets.empty()) {
FilteredOffsets.emplace(Offset);
}
ClientData.emplace(NYql::MakeReadOnlyRope(DataPacker->Finish()), std::move(FilteredOffsets), Watermark);
NumberRows = 0;
NewNumberRows = 0;
DataPackerSize = 0;
NewDataPackerSize = 0;
FilteredOffsets.clear();
WatermarksUs.clear();
Watermark.Clear();
}
}

Expand All @@ -343,7 +351,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
TVector<NYql::NUdf::TUnboxedValue> FilteredRow; // Temporary value holder for DataPacket
std::unique_ptr<NKikimr::NMiniKQL::TValuePackerTransport<true>> DataPacker;
TSet<ui64> FilteredOffsets; // Offsets of current batch in DataPacker
TVector<ui64> WatermarksUs;
TMaybe<TInstant> Watermark;
TQueue<TDataBatch> ClientData;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class IClientDataConsumer : public TThrRefBase {
struct TDataBatch {
TRope SerializedData;
TSet<ui64> Offsets;
TVector<ui64> WatermarksUs;
TMaybe<TInstant> Watermark;
};

class ITopicFormatHandler : public TNonCopyable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class TFormatHandlerFixture : public TBaseFixture {
using TCallback = std::function<void(NActors::TActorId, TQueue<TDataBatch>&&)>;
struct TMessages {
TVector<ui64> Offsets;
TVector<ui64> Watermark;
TMaybe<TInstant> Watermark;
TBatch Batch;
};

Expand Down Expand Up @@ -555,14 +555,14 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
auto messages = TVector<TMessages>{
{
{firstOffset + 2, firstOffset + 3},
{39'000'000, 40'000'000},
TInstant::Seconds(40),
TBatch()
.AddRow(TRow().AddString("1970-01-01T00:00:44Z"))
.AddRow(TRow().AddString("1970-01-01T00:00:45Z"))
},
{
{firstOffset + 4, firstOffset + 5},
{41'000'000, 42'000'000},
TInstant::Seconds(42),
TBatch()
.AddRow(TRow().AddString("1970-01-01T00:00:46Z"))
.AddRow(TRow().AddString("1970-01-01T00:00:47Z"))
Expand All @@ -584,18 +584,33 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
messages = TVector<TMessages>{
{
{firstOffset + 4, firstOffset + 5},
{41'000'000, 42'000'000},
TInstant::Seconds(42),
TBatch()
.AddRow(TRow().AddString("1970-01-01T00:00:46Z"))
.AddRow(TRow().AddString("1970-01-01T00:00:47Z"))
},
{
{firstOffset + 6, firstOffset + 7},
{43'000'000, 44'000'000},
TInstant::Seconds(44),
TBatch()
.AddRow(TRow().AddString("1970-01-01T00:00:48Z"))
.AddRow(TRow().AddString("1970-01-01T00:00:49Z"))
},
{
{firstOffset + 60, firstOffset + 70},
Nothing(),
TBatch()
.AddRow(TRow().AddString("1970-01-01T00:00:01Z")) // watermark = NULL
.AddRow(TRow().AddString("1970-01-01T00:00:02Z")) // watermark = NULL
},
{
{firstOffset + 600, firstOffset + 700, firstOffset + 800},
TInstant::Seconds(0),
TBatch()
.AddRow(TRow().AddString("1970-01-01T00:00:03Z")) // watermark = NULL
.AddRow(TRow().AddString("1970-01-01T00:00:05Z"))
.AddRow(TRow().AddString("1970-01-01T00:00:04Z")) // watermark = NULL
},
};
CheckSuccess(MakeClient(
{{"ts", "[DataType; String]"}},
Expand All @@ -617,6 +632,17 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
GetMessage(firstOffset + 7, R"({"ts": "1970-01-01T00:00:49Z"})"),
});

ParseMessages({
GetMessage(firstOffset + 60, R"({"ts": "1970-01-01T00:00:01Z"})"),
GetMessage(firstOffset + 70, R"({"ts": "1970-01-01T00:00:02Z"})"),
});

ParseMessages({
GetMessage(firstOffset + 600, R"({"ts": "1970-01-01T00:00:03Z"})"),
GetMessage(firstOffset + 700, R"({"ts": "1970-01-01T00:00:05Z"})"),
GetMessage(firstOffset + 800, R"({"ts": "1970-01-01T00:00:04Z"})"),
});

RemoveClient(ClientIds[1]);

ParseMessages({
Expand All @@ -635,14 +661,14 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {

auto messages = TVector<TMessages>{
{
{firstOffset + 2, firstOffset + 3},
{39'000'000, 40'000'000},
{firstOffset + 2},
TInstant::Seconds(40),
TBatch()
.AddRow(TRow().AddString("1970-01-01T00:00:44Z").AddUint64(1))
},
{
{firstOffset + 4, firstOffset + 5},
{41'000'000, 42'000'000},
{firstOffset + 4},
TInstant::Seconds(42),
TBatch()
.AddRow(TRow().AddString("1970-01-01T00:00:46Z").AddUint64(1))
},
Expand All @@ -662,14 +688,14 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {

messages = TVector<TMessages>{
{
{firstOffset + 4, firstOffset + 5},
{41'000'000, 42'000'000},
{firstOffset + 4},
TInstant::Seconds(42),
TBatch()
.AddRow(TRow().AddString("1970-01-01T00:00:46Z").AddUint64(1))
},
{
{firstOffset + 6, firstOffset + 7},
{43'000'000, 44'000'000},
{firstOffset + 6},
TInstant::Seconds(44),
TBatch()
.AddRow(TRow().AddString("1970-01-01T00:00:48Z").AddUint64(1))
},
Expand Down Expand Up @@ -712,13 +738,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {

auto messages = TVector<TMessages>{
{
{firstOffset + 2, firstOffset + 3},
{39'000'000, 40'000'000},
{firstOffset + 3},
TInstant::Seconds(40),
TBatch()
},
{
{firstOffset + 4, firstOffset + 5},
{41'000'000, 42'000'000},
{firstOffset + 5},
TInstant::Seconds(42),
TBatch()
},
};
Expand All @@ -737,13 +763,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {

messages = TVector<TMessages>{
{
{firstOffset + 4, firstOffset + 5},
{41'000'000, 42'000'000},
{firstOffset + 5},
TInstant::Seconds(42),
TBatch()
},
{
{firstOffset + 6, firstOffset + 7},
{43'000'000, 44'000'000},
{firstOffset + 7},
TInstant::Seconds(44),
TBatch()
},
};
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ void TTopicSession::SendData(TClientsInfo& info) {

ui64 batchSize = 0;
while (!buffer.empty()) {
auto [serializedData, offsets, watermarksUs] = std::move(buffer.front());
auto [serializedData, offsets, watermark] = std::move(buffer.front());
Y_ENSURE(!offsets.empty(), "Expected non empty message batch");
buffer.pop();

Expand All @@ -706,7 +706,9 @@ void TTopicSession::SendData(TClientsInfo& info) {
NFq::NRowDispatcherProto::TEvMessage message;
message.SetPayloadId(event->AddPayload(std::move(serializedData)));
message.MutableOffsets()->Assign(offsets.begin(), offsets.end());
message.MutableWatermarksUs()->Assign(watermarksUs.begin(), watermarksUs.end());
if (watermark) {
message.AddWatermarksUs(watermark->MicroSeconds());
}
event->Record.AddMessages()->CopyFrom(std::move(message));
event->Record.SetNextMessageOffset(*offsets.rbegin() + 1);

Expand Down
Loading