Skip to content

Commit ab58703

Browse files
dahbka-lisCopilot
andauthored
Impl methods to detect BulkUpsert with DEFAULT columns and a flag to disable (#27596)
Co-authored-by: Copilot <[email protected]>
1 parent cf7b87d commit ab58703

File tree

6 files changed

+137
-3
lines changed

6 files changed

+137
-3
lines changed

ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -782,12 +782,15 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
782782
TKikimrRunner kikimr(TKikimrSettings()
783783
.SetUseRealThreads(false)
784784
.SetEnableAddColumsWithDefaults(true)
785+
.SetDisableMissingDefaultColumnsInBulkUpsert(true)
785786
.SetWithSampleTables(false));
786787

787788
auto db = kikimr.RunCall([&] { return kikimr.GetQueryClient(); } );
788789
auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); } );
789790
auto querySession = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); } );
790791

792+
auto tableClient = kikimr.RunCall([&] { return kikimr.GetTableClient(); } );
793+
791794
auto& runtime = *kikimr.GetTestServer().GetRuntime();
792795

793796
{
@@ -860,7 +863,7 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
860863

861864
auto alterQuery = R"(
862865
ALTER TABLE `/Root/AddNonColumnDoesnotReturnInternalError`
863-
ADD COLUMN Value3 Int32 NOT NULL DEFAULT 7;
866+
ADD COLUMN Value3 Int32 DEFAULT 7;
864867
)";
865868

866869
auto alterFuture = kikimr.RunInThreadPool([&] { return session.ExecuteQuery(alterQuery, TTxControl::NoTx()).GetValueSync(); });
@@ -894,6 +897,29 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
894897
UNIT_ASSERT_STRING_CONTAINS(result, R"([[1u;"Changed";"Updated"];[2u;"New";"text"]])");
895898
}
896899

900+
{
901+
auto rowsBuilder = NYdb::TValueBuilder();
902+
rowsBuilder.BeginList();
903+
for (ui32 i = 10; i <= 15; ++i) {
904+
rowsBuilder.AddListItem()
905+
.BeginStruct()
906+
.AddMember("Key")
907+
.Uint32(i)
908+
.AddMember("Value")
909+
.String("String")
910+
.AddMember("Value2")
911+
.String("String2")
912+
.EndStruct();
913+
914+
}
915+
rowsBuilder.EndList();
916+
auto result = kikimr.RunCall([&] {
917+
return tableClient.BulkUpsert("/Root/AddNonColumnDoesnotReturnInternalError", rowsBuilder.Build()).GetValueSync();
918+
});
919+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SCHEME_ERROR, result.GetIssues().ToString());
920+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Missing default columns: Value3");
921+
}
922+
897923
{
898924
TString result = fQuery(R"(
899925
UPSERT INTO `/Root/AddNonColumnDoesnotReturnInternalError` (Key, Value, Value2, Value3) VALUES (1, "4", "four", 1);
@@ -933,8 +959,8 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
933959

934960
auto result = runtime.WaitFuture(alterFuture);
935961
fCompareTable(R"([
936-
[1u;"Changed";"Updated";7];
937-
[2u;"New";"text";7]
962+
[1u;"Changed";"Updated";[7]];
963+
[2u;"New";"text";[7]]
938964
])");
939965
}
940966

@@ -1513,6 +1539,81 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
15131539
}
15141540
}
15151541

1542+
Y_UNIT_TEST_TWIN(DefaultColumnAndBulkUpsert, DisableMissingDefaultColumnsInBulkUpsert) {
1543+
TKikimrRunner kikimr(TKikimrSettings()
1544+
.SetEnableAddColumsWithDefaults(true)
1545+
.SetDisableMissingDefaultColumnsInBulkUpsert(DisableMissingDefaultColumnsInBulkUpsert)
1546+
.SetWithSampleTables(false));
1547+
1548+
auto queryClient = kikimr.GetQueryClient();
1549+
auto tableClient = kikimr.GetTableClient();
1550+
1551+
{
1552+
auto query = R"(
1553+
CREATE TABLE `/Root/DefaultColumnAndBulkUpsert` (
1554+
Key Uint32 NOT NULL,
1555+
Value1 String DEFAULT "Default value",
1556+
Value2 Int64 DEFAULT 123,
1557+
PRIMARY KEY (Key),
1558+
);
1559+
)";
1560+
1561+
auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
1562+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
1563+
}
1564+
1565+
{
1566+
auto query = R"(
1567+
UPSERT INTO `/Root/DefaultColumnAndBulkUpsert` (Key) VALUES (1), (2);
1568+
)";
1569+
1570+
auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
1571+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
1572+
}
1573+
1574+
{
1575+
auto query = R"(
1576+
UPSERT INTO `/Root/DefaultColumnAndBulkUpsert` (Key, Value1) VALUES (3, "Value1"), (4, "Value2");
1577+
)";
1578+
1579+
auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
1580+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
1581+
}
1582+
1583+
{
1584+
auto query = R"(
1585+
ALTER TABLE `/Root/DefaultColumnAndBulkUpsert` ADD COLUMN Value3 Utf8 DEFAULT "Value3"u;
1586+
)";
1587+
1588+
auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
1589+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
1590+
}
1591+
1592+
{
1593+
auto rowsBuilder = NYdb::TValueBuilder();
1594+
rowsBuilder.BeginList();
1595+
for (ui32 i = 10; i <= 15; ++i) {
1596+
rowsBuilder.AddListItem()
1597+
.BeginStruct()
1598+
.AddMember("Key")
1599+
.Uint32(i)
1600+
.AddMember("Value2")
1601+
.OptionalInt64(0)
1602+
.EndStruct();
1603+
1604+
}
1605+
rowsBuilder.EndList();
1606+
1607+
auto result = tableClient.BulkUpsert("/Root/DefaultColumnAndBulkUpsert", rowsBuilder.Build()).ExtractValueSync();
1608+
if (DisableMissingDefaultColumnsInBulkUpsert) {
1609+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SCHEME_ERROR, result.GetIssues().ToString());
1610+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Missing default columns: Value3, Value1");
1611+
} else {
1612+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
1613+
}
1614+
}
1615+
}
1616+
15161617
// Y_UNIT_TEST(SetNotNull) {
15171618
// struct TValue {
15181619
// private:

ydb/core/protos/feature_flags.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,4 +238,5 @@ message TFeatureFlags {
238238
optional bool EnableTopicMessageLevelParallelism = 212 [default = false];
239239
optional bool EnableOlapRejectProbability = 213 [default = false];
240240
optional bool EnablePDiskLogForSmallDisks = 214 [default = false];
241+
optional bool DisableMissingDefaultColumnsInBulkUpsert = 215 [default = false];
241242
}

ydb/core/testlib/basics/feature_flags.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class TTestFeatureFlagsHolder {
8383
FEATURE_FLAG_SETTER(EnableDataShardWriteAlwaysVolatile)
8484
FEATURE_FLAG_SETTER(EnableStreamingQueries)
8585
FEATURE_FLAG_SETTER(EnableSecureScriptExecutions)
86+
FEATURE_FLAG_SETTER(DisableMissingDefaultColumnsInBulkUpsert)
8687

8788
#undef FEATURE_FLAG_SETTER
8889
};

ydb/core/tx/tx_proxy/upload_rows_common_impl.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
381381
THashMap<TString, ui32> columnByName;
382382
THashSet<TString> keyColumnsLeft;
383383
THashSet<TString> notNullColumnsLeft = entry.NotNullColumns;
384+
THashSet<TString> defaultColumnsLeft;
384385
SrcColumns.reserve(entry.Columns.size());
385386
THashSet<TString> HasInternalConversion;
386387

@@ -400,6 +401,10 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
400401
keyColumnIds[keyOrder] = id;
401402
keyColumnsLeft.insert(name);
402403
}
404+
405+
if (colInfo.IsDefaultFromLiteral()) {
406+
defaultColumnsLeft.insert(name);
407+
}
403408
}
404409

405410
if (entry.ColumnTableInfo) {
@@ -498,6 +503,10 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
498503
NotNullColumns.emplace(ci.Name);
499504
}
500505

506+
if (defaultColumnsLeft.contains(ci.Name)) {
507+
defaultColumnsLeft.erase(ci.Name);
508+
}
509+
501510
if (ci.KeyOrder != -1) {
502511
KeyColumnPositions[ci.KeyOrder] = TFieldDescription{ci.Id, ci.Name, (ui32)pos, ci.PType, pgTypeMod, notNull};
503512
keyColumnsLeft.erase(ci.Name);
@@ -582,6 +591,21 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
582591
return TConclusionStatus::Fail(Sprintf("Missing not null columns: %s", JoinSeq(", ", notNullColumnsLeft).c_str()));
583592
}
584593

594+
if (!defaultColumnsLeft.empty() && UpsertIfExists) {
595+
// some default columns are not specified in the request, but upsert will only update existing rows
596+
// and only the columns specified in the request will be updated; unspecified default columns will not be changed.
597+
defaultColumnsLeft.clear();
598+
}
599+
600+
if (!defaultColumnsLeft.empty()) {
601+
if (AppData(ctx)->FeatureFlags.GetDisableMissingDefaultColumnsInBulkUpsert()) {
602+
return TConclusionStatus::Fail(Sprintf("Missing default columns: %s", JoinSeq(", ", defaultColumnsLeft).c_str()));
603+
}
604+
605+
UploadCounters.OnMissingDefaultColumns();
606+
LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Missing default columns: " << JoinSeq(", ", defaultColumnsLeft).c_str());
607+
}
608+
585609
TConclusionStatus res = TConclusionStatus::Success();
586610
if (isColumnTable && HasAppData() && AppDataVerified().ColumnShardConfig.GetBulkUpsertRequireAllColumns()) {
587611
res = CheckRequiredColumns(entry, *reqColumns);

ydb/core/tx/tx_proxy/upload_rows_counters.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,5 +99,6 @@ TUploadCounters::TUploadCounters()
9999
WrittenBytes = TBase::GetDeriviative("Replies/WrittenBytes");
100100
FailedBytes = TBase::GetDeriviative("Replies/FailedBytes");
101101
RequestsBytes = TBase::GetDeriviative("Requests/Bytes");
102+
MissingDefaultColumnsCount = TBase::GetDeriviative("MissingDefaultColumns/Count");
102103
}
103104
}

ydb/core/tx/tx_proxy/upload_rows_counters.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ class TUploadCounters: public NColumnShard::TCommonCountersOwner {
7676
NMonitoring::TDynamicCounters::TCounterPtr FailedBytes;
7777
NMonitoring::TDynamicCounters::TCounterPtr RequestsBytes;
7878

79+
NMonitoring::TDynamicCounters::TCounterPtr MissingDefaultColumnsCount;
80+
7981
THashMap<TUploadStatus, NMonitoring::TDynamicCounters::TCounterPtr, TUploadStatus::THasher> CodesCount;
8082

8183
NMonitoring::TDynamicCounters::TCounterPtr GetCodeCounter(const TUploadStatus& status);
@@ -142,6 +144,10 @@ class TUploadCounters: public NColumnShard::TCommonCountersOwner {
142144
PackageSizeCountByRecords->Collect(rowsCount);
143145
RequestsBytes->Add(requestBytes);
144146
}
147+
148+
void OnMissingDefaultColumns() {
149+
MissingDefaultColumnsCount->Inc();
150+
}
145151
};
146152

147153
} // namespace NKikimr

0 commit comments

Comments
 (0)