Skip to content

Commit 35c5a88

Browse files
authored
Remove query from Compute Scheduler in all cases (#27598) (#27732)
2 parents e4d2412 + e932c66 commit 35c5a88

File tree

8 files changed

+48
-39
lines changed

8 files changed

+48
-39
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
608608

609609
LWTRACK(KqpBaseExecuterHandleReady, ResponseEv->Orbit, TxId);
610610

611-
if (!databaseId.empty() && (poolId != NResourcePool::DEFAULT_POOL_ID || AccountDefaultPoolInScheduler)) {
611+
if (IsSchedulable()) {
612612
const auto schedulerServiceId = MakeKqpSchedulerServiceId(SelfId().NodeId());
613613

614614
// TODO: deliberately create the database here - since database doesn't have any useful scheduling properties for now.
@@ -1193,6 +1193,12 @@ class TKqpExecuterBase : public TActor<TDerived> {
11931193
Request.Transactions.crop(0);
11941194
this->Send(Target, ResponseEv.release());
11951195

1196+
if (IsSchedulable()) {
1197+
auto removeQueryEvent = MakeHolder<NScheduler::TEvRemoveQuery>();
1198+
removeQueryEvent->QueryId = TxId;
1199+
this->Send(MakeKqpSchedulerServiceId(SelfId().NodeId()), removeQueryEvent.Release());
1200+
}
1201+
11961202
for (auto channelPair: ResultChannelProxies) {
11971203
LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId());
11981204

@@ -1269,6 +1275,12 @@ class TKqpExecuterBase : public TActor<TDerived> {
12691275
return TasksGraph.ArenaSerializeTaskToProto(task, serializeAsyncIoSettings);
12701276
}
12711277

1278+
inline bool IsSchedulable() const {
1279+
const auto& databaseId = GetUserRequestContext()->DatabaseId;
1280+
const auto& poolId = GetUserRequestContext()->PoolId.empty() ? NResourcePool::DEFAULT_POOL_ID : GetUserRequestContext()->PoolId;
1281+
return !databaseId.empty() && (poolId != NResourcePool::DEFAULT_POOL_ID || AccountDefaultPoolInScheduler);
1282+
}
1283+
12721284
protected:
12731285
IKqpGateway::TExecPhysicalRequest Request;
12741286
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;

ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp renamed to ydb/core/kqp/executer_actor/kqp_executer_ut.cpp

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
1717
- Start query execution and receive TEvTxRequest.
1818
- When sending TEvAddQuery from executer to scheduler, immediately receive TEvAbortExecution.
1919
- Imitate receiving TEvQueryResponse before receiving self TEvPoison by executer.
20+
- Check that scheduler got TEvRemoveQuery.
2021
- Do not crash or get undefined behavior.
2122
*/
2223
Y_UNIT_TEST(TestSuddenAbortAfterReady) {
@@ -27,27 +28,18 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
2728
auto db = kikimr.RunCall([&] { return kikimr.GetTableClient(); } );
2829
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); } );
2930

30-
auto prepareResult = kikimr.RunCall([&] { return session.PrepareDataQuery(Q_(R"(
31-
SELECT COUNT(*) FROM `/Root/TwoShard`;
32-
)")).GetValueSync();
33-
});
34-
UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString());
35-
auto dataQuery = prepareResult.GetQuery();
36-
3731
TActorId executerId, targetId;
32+
ui8 queries = 0;
3833
auto& runtime = *kikimr.GetTestServer().GetRuntime();
3934
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
40-
{
41-
TStringStream ss;
42-
ss << "Got " << ev->GetTypeName() << " " << ev->Recipient << " " << ev->Sender << Endl;
43-
Cerr << ss.Str();
44-
}
35+
Cerr << (TStringBuilder() << "Got " << ev->GetTypeName() << " " << ev->Recipient << " " << ev->Sender << Endl);
4536

4637
if (ev->GetTypeRewrite() == TEvKqpExecuter::TEvTxRequest::EventType) {
4738
targetId = ActorIdFromProto(ev->Get<TEvKqpExecuter::TEvTxRequest>()->Record.GetTarget());
4839
}
4940

5041
if (ev->GetTypeRewrite() == NScheduler::TEvAddQuery::EventType) {
42+
++queries;
5143
executerId = ev->Sender;
5244
auto* abortExecution = new TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues());
5345
runtime.Send(new IEventHandle(ev->Sender, targetId, abortExecution));
@@ -60,11 +52,16 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
6052
return TTestActorRuntime::EEventAction::PROCESS;
6153
});
6254

63-
auto future = kikimr.RunInThreadPool([&] { return dataQuery.Execute(TTxControl::BeginTx().CommitTx(), TExecDataQuerySettings()).GetValueSync(); });
55+
auto future = kikimr.RunInThreadPool([&] {
56+
return session.ExecuteDataQuery("SELECT COUNT(*) FROM `/Root/TwoShard`;", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
57+
});
6458

6559
TDispatchOptions opts;
6660
opts.FinalEvents.emplace_back([&](IEventHandle& ev) {
67-
return ev.GetTypeRewrite() == TEvKqpExecuter::TEvTxResponse::EventType;
61+
if (ev.GetTypeRewrite() == NScheduler::TEvRemoveQuery::EventType) {
62+
--queries;
63+
}
64+
return (ev.GetTypeRewrite() == TEvKqpExecuter::TEvTxResponse::EventType || ev.GetTypeRewrite() == NScheduler::TEvRemoveQuery::EventType) && !queries;
6865
});
6966
runtime.DispatchEvents(opts);
7067

@@ -141,4 +138,4 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
141138
*/
142139
}
143140

144-
} // namespace NKikimr
141+
} // namespace NKikimr::NKqp

ydb/core/kqp/executer_actor/ut/ya.make

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
1-
UNITTEST_FOR(ydb/core/kqp)
2-
3-
FORK_SUBTESTS()
4-
5-
IF (SANITIZER_TYPE OR WITH_VALGRIND)
6-
SIZE(MEDIUM)
7-
ENDIF()
1+
UNITTEST_FOR(ydb/core/kqp/executer_actor)
82

93
SRCS(
104
kqp_executer_ut.cpp

ydb/core/kqp/node_service/kqp_node_state.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@ class TState {
104104
ExpiringRequests.erase(expireIt);
105105
}
106106

107-
if (auto query = requestIt->second.Query) {
107+
if (requestIt->second.Query) {
108108
auto removeQueryEvent = MakeHolder<NScheduler::TEvRemoveQuery>();
109-
removeQueryEvent->Query = query;
109+
removeQueryEvent->QueryId = txId;
110110
const auto& actorCtx = NActors::TActorContext::AsActorContext();
111111
actorCtx.Send(MakeKqpSchedulerServiceId(actorCtx.SelfID.NodeId()), removeQueryEvent.Release());
112112
}

ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_service.cpp

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ class TComputeSchedulerService : public NActors::TActorBootstrapped<TComputeSche
118118
// TODO: Scheduler->UpdatePool(…);
119119
}
120120
} else {
121+
LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE_SCHEDULER, "Trying to remove unknown pool: " << databaseId << "/" << poolId);
121122
// TODO: the removing message for unknown pool - should we check?
122123
}
123124
}
@@ -137,7 +138,9 @@ class TComputeSchedulerService : public NActors::TActorBootstrapped<TComputeSche
137138
}
138139

139140
void Handle(TEvRemoveQuery::TPtr& ev) {
140-
Scheduler->RemoveQuery(ev->Get()->Query);
141+
if (!Scheduler->RemoveQuery(ev->Get()->QueryId)) {
142+
LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE_SCHEDULER, "Trying to remove unknown query: " << ev->Get()->QueryId);
143+
}
141144
}
142145

143146
void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
@@ -226,7 +229,7 @@ void TComputeScheduler::AddOrUpdatePool(const TString& databaseId, const TString
226229
}
227230
}
228231

229-
TQueryPtr TComputeScheduler::AddOrUpdateQuery(const TString& databaseId, const TString& poolId, const NHdrf::TQueryId& queryId, const NHdrf::TStaticAttributes& attrs) {
232+
TQueryPtr TComputeScheduler::AddOrUpdateQuery(const NHdrf::TDatabaseId& databaseId, const NHdrf::TPoolId& poolId, const NHdrf::TQueryId& queryId, const NHdrf::TStaticAttributes& attrs) {
230233
Y_ENSURE(!poolId.empty());
231234

232235
TWriteGuard lock(Mutex);
@@ -248,14 +251,16 @@ TQueryPtr TComputeScheduler::AddOrUpdateQuery(const TString& databaseId, const T
248251
return query;
249252
}
250253

251-
void TComputeScheduler::RemoveQuery(const TQueryPtr& query) {
252-
Y_ENSURE(query);
253-
254+
bool TComputeScheduler::RemoveQuery(const NHdrf::TQueryId& queryId) {
254255
TWriteGuard lock(Mutex);
255-
const auto& queryId = std::get<NHdrf::TQueryId>(query->GetId());
256256

257-
Y_ENSURE(Queries.erase(queryId));
258-
query->GetParent()->RemoveQuery(queryId);
257+
if (auto queryIt = Queries.find(queryId); queryIt != Queries.end()) {
258+
queryIt->second->GetParent()->RemoveQuery(queryId);
259+
Queries.erase(queryIt);
260+
return true;
261+
}
262+
263+
return false;
259264
}
260265

261266
void TComputeScheduler::UpdateFairShare() {

ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_service.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ class TComputeScheduler : public std::enable_shared_from_this<TComputeScheduler>
1515
void SetTotalCpuLimit(ui64 cpu);
1616
ui64 GetTotalCpuLimit() const;
1717

18-
void AddOrUpdateDatabase(const TString& databaseId, const NHdrf::TStaticAttributes& attrs);
18+
void AddOrUpdateDatabase(const NHdrf::TDatabaseId& databaseId, const NHdrf::TStaticAttributes& attrs);
1919

20-
void AddOrUpdatePool(const TString& databaseId, const TString& poolId, const NHdrf::TStaticAttributes& attrs);
20+
void AddOrUpdatePool(const NHdrf::TDatabaseId& databaseId, const NHdrf::TPoolId& poolId, const NHdrf::TStaticAttributes& attrs);
2121

22-
NHdrf::NDynamic::TQueryPtr AddOrUpdateQuery(const TString& databaseId, const TString& poolId, const NHdrf::TQueryId& queryId, const NHdrf::TStaticAttributes& attrs);
23-
void RemoveQuery(const NHdrf::NDynamic::TQueryPtr& query);
22+
NHdrf::NDynamic::TQueryPtr AddOrUpdateQuery(const NHdrf::TDatabaseId& databaseId, const NHdrf::TPoolId& poolId, const NHdrf::TQueryId& queryId, const NHdrf::TStaticAttributes& attrs);
23+
bool RemoveQuery(const NHdrf::TQueryId& queryId);
2424

2525
void UpdateFairShare();
2626

@@ -89,7 +89,7 @@ struct TEvAddQuery : public TEventLocal<TEvAddQuery, TEvents::EvAddQuery> {
8989
};
9090

9191
struct TEvRemoveQuery : public TEventLocal<TEvRemoveQuery, TEvents::EvRemoveQuery> {
92-
NHdrf::NDynamic::TQueryPtr Query;
92+
NHdrf::TQueryId QueryId;
9393
};
9494

9595
struct TEvQueryResponse : public TEventLocal<TEvQueryResponse, TEvents::EvQueryResponse> {

ydb/core/kqp/runtime/scheduler/kqp_schedulable_task.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ struct TSchedulableTask : public std::enable_shared_from_this<TSchedulableTask>
2828
void IncreaseUsage();
2929
void DecreaseUsage(const TDuration& burstUsage, bool forcedResume);
3030

31-
// Returns parent pool's 'fair-share' - 'usage'
31+
// Returns parent pool's 'fair-share' minus 'usage'
3232
size_t GetSpareUsage() const;
3333

3434
// Account extra usage which doesn't affect scheduling

ydb/library/services/services.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ enum EServiceKikimr {
249249
KQP_SESSION = 545;
250250
KQP_COMPILE_COMPUTATION_PATTERN_SERVICE = 546;
251251
KQP_WORKLOAD_SERVICE = 547;
252+
KQP_COMPUTE_SCHEDULER = 548;
252253

253254
TABLET_RESOURCE_BROKER = 540;
254255

0 commit comments

Comments
 (0)