Skip to content

Commit 56ecfcb

Browse files
committed
Remove query from Compute Scheduler in all cases
1 parent 18ae5ce commit 56ecfcb

File tree

7 files changed

+38
-25
lines changed

7 files changed

+38
-25
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,6 +1197,12 @@ class TKqpExecuterBase : public TActor<TDerived> {
11971197
Request.Transactions.crop(0);
11981198
this->Send(Target, ResponseEv.release());
11991199

1200+
{
1201+
auto removeQueryEvent = MakeHolder<NScheduler::TEvRemoveQuery>();
1202+
removeQueryEvent->QueryId = TxId;
1203+
this->Send(MakeKqpSchedulerServiceId(SelfId().NodeId()), removeQueryEvent.Release());
1204+
}
1205+
12001206
for (auto channelPair: ResultChannelProxies) {
12011207
LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId());
12021208

ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,11 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
2727
auto db = kikimr.RunCall([&] { return kikimr.GetTableClient(); } );
2828
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); } );
2929

30-
auto prepareResult = kikimr.RunCall([&] { return session.PrepareDataQuery(Q_(R"(
31-
SELECT COUNT(*) FROM `/Root/TwoShard`;
32-
)")).GetValueSync();
33-
});
34-
UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString());
35-
auto dataQuery = prepareResult.GetQuery();
36-
3730
TActorId executerId, targetId;
31+
ui8 queries = 0;
3832
auto& runtime = *kikimr.GetTestServer().GetRuntime();
3933
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
34+
// TODO: remove debug logging
4035
{
4136
TStringStream ss;
4237
ss << "Got " << ev->GetTypeName() << " " << ev->Recipient << " " << ev->Sender << Endl;
@@ -48,6 +43,7 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
4843
}
4944

5045
if (ev->GetTypeRewrite() == NScheduler::TEvAddQuery::EventType) {
46+
++queries;
5147
executerId = ev->Sender;
5248
auto* abortExecution = new TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues());
5349
runtime.Send(new IEventHandle(ev->Sender, targetId, abortExecution));
@@ -60,11 +56,16 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
6056
return TTestActorRuntime::EEventAction::PROCESS;
6157
});
6258

63-
auto future = kikimr.RunInThreadPool([&] { return dataQuery.Execute(TTxControl::BeginTx().CommitTx(), TExecDataQuerySettings()).GetValueSync(); });
59+
auto future = kikimr.RunInThreadPool([&] {
60+
return session.ExecuteDataQuery("SELECT COUNT(*) FROM `/Root/TwoShard`;", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
61+
});
6462

6563
TDispatchOptions opts;
6664
opts.FinalEvents.emplace_back([&](IEventHandle& ev) {
67-
return ev.GetTypeRewrite() == TEvKqpExecuter::TEvTxResponse::EventType;
65+
if (ev.GetTypeRewrite() == NScheduler::TEvRemoveQuery::EventType) {
66+
--queries;
67+
}
68+
return (ev.GetTypeRewrite() == TEvKqpExecuter::TEvTxResponse::EventType || ev.GetTypeRewrite() == NScheduler::TEvRemoveQuery::EventType) && !queries;
6869
});
6970
runtime.DispatchEvents(opts);
7071

ydb/core/kqp/node_service/kqp_node_state.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ class TState {
103103
ExpiringRequests.erase(expireIt);
104104
}
105105

106-
if (auto query = requestIt->second.Query) {
106+
if (requestIt->second.Query) {
107107
auto removeQueryEvent = MakeHolder<NScheduler::TEvRemoveQuery>();
108-
removeQueryEvent->Query = query;
108+
removeQueryEvent->QueryId = txId;
109109
actorSystem->Send(MakeKqpSchedulerServiceId(actorSystem->NodeId), removeQueryEvent.Release());
110110
}
111111

ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_service.cpp

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ class TComputeSchedulerService : public NActors::TActorBootstrapped<TComputeSche
118118
// TODO: Scheduler->UpdatePool(…);
119119
}
120120
} else {
121+
LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE_SCHEDULER, "Trying to remove unknown pool: " << databaseId << "/" << poolId);
121122
// TODO: the removing message for unknown pool - should we check?
122123
}
123124
}
@@ -137,7 +138,9 @@ class TComputeSchedulerService : public NActors::TActorBootstrapped<TComputeSche
137138
}
138139

139140
void Handle(TEvRemoveQuery::TPtr& ev) {
140-
Scheduler->RemoveQuery(ev->Get()->Query);
141+
if (!Scheduler->RemoveQuery(ev->Get()->QueryId)) {
142+
LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE_SCHEDULER, "Trying to remove unknown query: " << ev->Get()->QueryId);
143+
}
141144
}
142145

143146
void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
@@ -230,7 +233,7 @@ void TComputeScheduler::AddOrUpdatePool(const TString& databaseId, const TString
230233
}
231234
}
232235

233-
TQueryPtr TComputeScheduler::AddOrUpdateQuery(const TString& databaseId, const TString& poolId, const NHdrf::TQueryId& queryId, const NHdrf::TStaticAttributes& attrs) {
236+
TQueryPtr TComputeScheduler::AddOrUpdateQuery(const NHdrf::TDatabaseId& databaseId, const NHdrf::TPoolId& poolId, const NHdrf::TQueryId& queryId, const NHdrf::TStaticAttributes& attrs) {
234237
Y_ENSURE(!poolId.empty());
235238

236239
TWriteGuard lock(Mutex);
@@ -253,14 +256,16 @@ TQueryPtr TComputeScheduler::AddOrUpdateQuery(const TString& databaseId, const T
253256
return query;
254257
}
255258

256-
void TComputeScheduler::RemoveQuery(const TQueryPtr& query) {
257-
Y_ENSURE(query);
258-
259+
bool TComputeScheduler::RemoveQuery(const NHdrf::TQueryId& queryId) {
259260
TWriteGuard lock(Mutex);
260-
const auto& queryId = std::get<NHdrf::TQueryId>(query->GetId());
261261

262-
Y_ENSURE(Queries.erase(queryId));
263-
query->GetParent()->RemoveQuery(queryId);
262+
if (auto queryIt = Queries.find(queryId); queryIt != Queries.end()) {
263+
queryIt->second->GetParent()->RemoveQuery(queryId);
264+
Queries.erase(queryIt);
265+
return true;
266+
}
267+
268+
return false;
264269
}
265270

266271
void TComputeScheduler::UpdateFairShare(bool allowFairShareOverlimit) {

ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_service.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ class TComputeScheduler : public std::enable_shared_from_this<TComputeScheduler>
1515
void SetTotalCpuLimit(ui64 cpu);
1616
ui64 GetTotalCpuLimit() const;
1717

18-
void AddOrUpdateDatabase(const TString& databaseId, const NHdrf::TStaticAttributes& attrs);
18+
void AddOrUpdateDatabase(const NHdrf::TDatabaseId& databaseId, const NHdrf::TStaticAttributes& attrs);
1919

20-
void AddOrUpdatePool(const TString& databaseId, const TString& poolId, const NHdrf::TStaticAttributes& attrs);
20+
void AddOrUpdatePool(const NHdrf::TDatabaseId& databaseId, const NHdrf::TPoolId& poolId, const NHdrf::TStaticAttributes& attrs);
2121

22-
NHdrf::NDynamic::TQueryPtr AddOrUpdateQuery(const TString& databaseId, const TString& poolId, const NHdrf::TQueryId& queryId, const NHdrf::TStaticAttributes& attrs);
23-
void RemoveQuery(const NHdrf::NDynamic::TQueryPtr& query);
22+
NHdrf::NDynamic::TQueryPtr AddOrUpdateQuery(const NHdrf::TDatabaseId& databaseId, const NHdrf::TPoolId& poolId, const NHdrf::TQueryId& queryId, const NHdrf::TStaticAttributes& attrs);
23+
bool RemoveQuery(const NHdrf::TQueryId& queryId);
2424

2525
// We want to allow FairShare to be over Limit.
2626
// If you need to change this behaviour change variable's default value
@@ -91,7 +91,7 @@ struct TEvAddQuery : public TEventLocal<TEvAddQuery, TEvents::EvAddQuery> {
9191
};
9292

9393
struct TEvRemoveQuery : public TEventLocal<TEvRemoveQuery, TEvents::EvRemoveQuery> {
94-
NHdrf::NDynamic::TQueryPtr Query;
94+
NHdrf::TQueryId QueryId;
9595
};
9696

9797
struct TEvQueryResponse : public TEventLocal<TEvQueryResponse, TEvents::EvQueryResponse> {

ydb/core/kqp/runtime/scheduler/kqp_schedulable_task.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ struct TSchedulableTask : public std::enable_shared_from_this<TSchedulableTask>
2828
void IncreaseUsage();
2929
void DecreaseUsage(const TDuration& burstUsage, bool forcedResume);
3030

31-
// Returns parent pool's 'fair-share' - 'usage'
31+
// Returns parent pool's 'fair-share' minus 'usage'
3232
size_t GetSpareUsage() const;
3333

3434
// Account extra usage which doesn't affect scheduling

ydb/library/services/services.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ enum EServiceKikimr {
256256
KQP_SESSION = 545;
257257
KQP_COMPILE_COMPUTATION_PATTERN_SERVICE = 546;
258258
KQP_WORKLOAD_SERVICE = 547;
259+
KQP_COMPUTE_SCHEDULER = 548;
259260

260261
TABLET_RESOURCE_BROKER = 540;
261262

0 commit comments

Comments
 (0)