Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ class TKqpExecuterBase : public TActor<TDerived> {

LWTRACK(KqpBaseExecuterHandleReady, ResponseEv->Orbit, TxId);

if (!databaseId.empty() && (poolId != NResourcePool::DEFAULT_POOL_ID || AccountDefaultPoolInScheduler)) {
if (IsSchedulable()) {
const auto schedulerServiceId = MakeKqpSchedulerServiceId(SelfId().NodeId());

// TODO: deliberately create the database here - since database doesn't have any useful scheduling properties for now.
Expand Down Expand Up @@ -1197,6 +1197,12 @@ class TKqpExecuterBase : public TActor<TDerived> {
Request.Transactions.crop(0);
this->Send(Target, ResponseEv.release());

if (IsSchedulable()) {
auto removeQueryEvent = MakeHolder<NScheduler::TEvRemoveQuery>();
removeQueryEvent->QueryId = TxId;
this->Send(MakeKqpSchedulerServiceId(SelfId().NodeId()), removeQueryEvent.Release());
}

for (auto channelPair: ResultChannelProxies) {
LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId());

Expand Down Expand Up @@ -1273,6 +1279,12 @@ class TKqpExecuterBase : public TActor<TDerived> {
return TasksGraph.ArenaSerializeTaskToProto(task, serializeAsyncIoSettings);
}

inline bool IsSchedulable() const {
const auto& databaseId = GetUserRequestContext()->DatabaseId;
const auto& poolId = GetUserRequestContext()->PoolId.empty() ? NResourcePool::DEFAULT_POOL_ID : GetUserRequestContext()->PoolId;
return !databaseId.empty() && (poolId != NResourcePool::DEFAULT_POOL_ID || AccountDefaultPoolInScheduler);
}

protected:
IKqpGateway::TExecPhysicalRequest Request;
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
- Start query execution and receive TEvTxRequest.
- When sending TEvAddQuery from executer to scheduler, immediately receive TEvAbortExecution.
- Imitate receiving TEvQueryResponse before receiving self TEvPoison by executer.
- Check that scheduler got TEvRemoveQuery.
- Do not crash or get undefined behavior.
*/
Y_UNIT_TEST(TestSuddenAbortAfterReady) {
Expand All @@ -27,27 +28,18 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
auto db = kikimr.RunCall([&] { return kikimr.GetTableClient(); } );
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); } );

auto prepareResult = kikimr.RunCall([&] { return session.PrepareDataQuery(Q_(R"(
SELECT COUNT(*) FROM `/Root/TwoShard`;
)")).GetValueSync();
});
UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString());
auto dataQuery = prepareResult.GetQuery();

TActorId executerId, targetId;
ui8 queries = 0;
auto& runtime = *kikimr.GetTestServer().GetRuntime();
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
{
TStringStream ss;
ss << "Got " << ev->GetTypeName() << " " << ev->Recipient << " " << ev->Sender << Endl;
Cerr << ss.Str();
}
Cerr << (TStringBuilder() << "Got " << ev->GetTypeName() << " " << ev->Recipient << " " << ev->Sender << Endl);

if (ev->GetTypeRewrite() == TEvKqpExecuter::TEvTxRequest::EventType) {
targetId = ActorIdFromProto(ev->Get<TEvKqpExecuter::TEvTxRequest>()->Record.GetTarget());
}

if (ev->GetTypeRewrite() == NScheduler::TEvAddQuery::EventType) {
++queries;
executerId = ev->Sender;
auto* abortExecution = new TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues());
runtime.Send(new IEventHandle(ev->Sender, targetId, abortExecution));
Expand All @@ -60,11 +52,16 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
return TTestActorRuntime::EEventAction::PROCESS;
});

auto future = kikimr.RunInThreadPool([&] { return dataQuery.Execute(TTxControl::BeginTx().CommitTx(), TExecDataQuerySettings()).GetValueSync(); });
auto future = kikimr.RunInThreadPool([&] {
return session.ExecuteDataQuery("SELECT COUNT(*) FROM `/Root/TwoShard`;", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
});

TDispatchOptions opts;
opts.FinalEvents.emplace_back([&](IEventHandle& ev) {
return ev.GetTypeRewrite() == TEvKqpExecuter::TEvTxResponse::EventType;
if (ev.GetTypeRewrite() == NScheduler::TEvRemoveQuery::EventType) {
--queries;
}
return (ev.GetTypeRewrite() == TEvKqpExecuter::TEvTxResponse::EventType || ev.GetTypeRewrite() == NScheduler::TEvRemoveQuery::EventType) && !queries;
});
runtime.DispatchEvents(opts);

Expand Down Expand Up @@ -141,4 +138,4 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
*/
}

} // namespace NKikimr
} // namespace NKikimr::NKqp
8 changes: 1 addition & 7 deletions ydb/core/kqp/executer_actor/ut/ya.make
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
UNITTEST_FOR(ydb/core/kqp)

FORK_SUBTESTS()

IF (SANITIZER_TYPE OR WITH_VALGRIND)
SIZE(MEDIUM)
ENDIF()
UNITTEST_FOR(ydb/core/kqp/executer_actor)

SRCS(
kqp_executer_ut.cpp
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/node_service/kqp_node_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ class TState {
ExpiringRequests.erase(expireIt);
}

if (auto query = requestIt->second.Query) {
if (requestIt->second.Query) {
auto removeQueryEvent = MakeHolder<NScheduler::TEvRemoveQuery>();
removeQueryEvent->Query = query;
removeQueryEvent->QueryId = txId;
actorSystem->Send(MakeKqpSchedulerServiceId(actorSystem->NodeId), removeQueryEvent.Release());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class TComputeSchedulerService : public NActors::TActorBootstrapped<TComputeSche
// TODO: Scheduler->UpdatePool(…);
}
} else {
LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE_SCHEDULER, "Trying to remove unknown pool: " << databaseId << "/" << poolId);
// TODO: the removing message for unknown pool - should we check?
}
}
Expand All @@ -137,7 +138,9 @@ class TComputeSchedulerService : public NActors::TActorBootstrapped<TComputeSche
}

void Handle(TEvRemoveQuery::TPtr& ev) {
Scheduler->RemoveQuery(ev->Get()->Query);
if (!Scheduler->RemoveQuery(ev->Get()->QueryId)) {
LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE_SCHEDULER, "Trying to remove unknown query: " << ev->Get()->QueryId);
}
}

void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
Expand Down Expand Up @@ -230,7 +233,7 @@ void TComputeScheduler::AddOrUpdatePool(const TString& databaseId, const TString
}
}

TQueryPtr TComputeScheduler::AddOrUpdateQuery(const TString& databaseId, const TString& poolId, const NHdrf::TQueryId& queryId, const NHdrf::TStaticAttributes& attrs) {
TQueryPtr TComputeScheduler::AddOrUpdateQuery(const NHdrf::TDatabaseId& databaseId, const NHdrf::TPoolId& poolId, const NHdrf::TQueryId& queryId, const NHdrf::TStaticAttributes& attrs) {
Y_ENSURE(!poolId.empty());

TWriteGuard lock(Mutex);
Expand All @@ -253,14 +256,16 @@ TQueryPtr TComputeScheduler::AddOrUpdateQuery(const TString& databaseId, const T
return query;
}

void TComputeScheduler::RemoveQuery(const TQueryPtr& query) {
Y_ENSURE(query);

bool TComputeScheduler::RemoveQuery(const NHdrf::TQueryId& queryId) {
TWriteGuard lock(Mutex);
const auto& queryId = std::get<NHdrf::TQueryId>(query->GetId());

Y_ENSURE(Queries.erase(queryId));
query->GetParent()->RemoveQuery(queryId);
if (auto queryIt = Queries.find(queryId); queryIt != Queries.end()) {
queryIt->second->GetParent()->RemoveQuery(queryId);
Queries.erase(queryIt);
return true;
}

return false;
}

void TComputeScheduler::UpdateFairShare(bool allowFairShareOverlimit) {
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ class TComputeScheduler : public std::enable_shared_from_this<TComputeScheduler>
void SetTotalCpuLimit(ui64 cpu);
ui64 GetTotalCpuLimit() const;

void AddOrUpdateDatabase(const TString& databaseId, const NHdrf::TStaticAttributes& attrs);
void AddOrUpdateDatabase(const NHdrf::TDatabaseId& databaseId, const NHdrf::TStaticAttributes& attrs);

void AddOrUpdatePool(const TString& databaseId, const TString& poolId, const NHdrf::TStaticAttributes& attrs);
void AddOrUpdatePool(const NHdrf::TDatabaseId& databaseId, const NHdrf::TPoolId& poolId, const NHdrf::TStaticAttributes& attrs);

NHdrf::NDynamic::TQueryPtr AddOrUpdateQuery(const TString& databaseId, const TString& poolId, const NHdrf::TQueryId& queryId, const NHdrf::TStaticAttributes& attrs);
void RemoveQuery(const NHdrf::NDynamic::TQueryPtr& query);
NHdrf::NDynamic::TQueryPtr AddOrUpdateQuery(const NHdrf::TDatabaseId& databaseId, const NHdrf::TPoolId& poolId, const NHdrf::TQueryId& queryId, const NHdrf::TStaticAttributes& attrs);
bool RemoveQuery(const NHdrf::TQueryId& queryId);

// We want to allow FairShare to be over Limit.
// If you need to change this behaviour change variable's default value
Expand Down Expand Up @@ -91,7 +91,7 @@ struct TEvAddQuery : public TEventLocal<TEvAddQuery, TEvents::EvAddQuery> {
};

struct TEvRemoveQuery : public TEventLocal<TEvRemoveQuery, TEvents::EvRemoveQuery> {
NHdrf::NDynamic::TQueryPtr Query;
NHdrf::TQueryId QueryId;
};

struct TEvQueryResponse : public TEventLocal<TEvQueryResponse, TEvents::EvQueryResponse> {
Expand Down
32 changes: 16 additions & 16 deletions ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ namespace {
UNIT_ASSERT_LE(querySnapshot->FairShare, kQueryDemand);
}
}

auto* poolSnapshot = queries.front()->GetSnapshot()->GetParent();
UNIT_ASSERT(poolSnapshot);
UNIT_ASSERT_VALUES_EQUAL(poolSnapshot->FairShare, kCpuLimit);
Expand Down Expand Up @@ -170,7 +170,7 @@ namespace {
for (const auto& poolId : poolIds) {
scheduler.AddOrUpdatePool(databaseId, poolId, {});
}

std::vector<std::vector<NHdrf::NDynamic::TQueryPtr>> queries;
std::vector<std::vector<TSchedulableTaskPtr>> tasks;
NHdrf::TQueryId queryId = 0;
Expand All @@ -191,7 +191,7 @@ namespace {
auto* poolSnapshot2 = queries[1].front()->GetSnapshot()->GetParent();
UNIT_ASSERT(poolSnapshot2);
UNIT_ASSERT_VALUES_EQUAL(poolSnapshot2->Demand, kCpuLimit);

auto* databaseSnapshot = poolSnapshot1->GetParent();
UNIT_ASSERT(databaseSnapshot);
UNIT_ASSERT_VALUES_EQUAL(databaseSnapshot->Demand, kCpuLimit);
Expand All @@ -202,7 +202,7 @@ namespace {
Scenario:
- 1 database with 1 pool and 3 queries with demand 4
- CPU limit is less than sum of demands so the last query can't get full satisfaction
- Checking that each query get 1 demand and than two queries get full demand while the last only gets what lasts
- Checking that each query get 1 demand and than two queries get full demand while the last only gets what lasts
*/
constexpr ui64 kCpuLimit = 10;
constexpr size_t kNQueries = 3;
Expand All @@ -215,7 +215,7 @@ namespace {
};
TComputeScheduler scheduler(options.Counters, options.DelayParams);
scheduler.SetTotalCpuLimit(kCpuLimit);

const TString databaseId = "db1";
scheduler.AddOrUpdateDatabase(databaseId, {});

Expand Down Expand Up @@ -337,7 +337,7 @@ namespace {
std::vector<std::vector<TSchedulableTaskPtr>> tasks;

std::vector<ui64> queryDemands = {6, 3, 3};

for (NHdrf::TQueryId queryId = 0; queryId < kNQueries; ++queryId) {
auto query = queries.emplace_back(
scheduler.AddOrUpdateQuery(databaseId, pools[queryId], queryId, {})
Expand Down Expand Up @@ -684,7 +684,7 @@ namespace {
Scenario:
- 1 database with 1 pool and 3 queries with demand 5
- CPU limit is less than demand so FairShare is distributed in FIFO order with at least 1 FairShare for each query
- After adding one more query FairShare is still distributed in FIFO order but second query gets less
- After adding one more query FairShare is still distributed in FIFO order but second query gets less
- Decreasing Demand for the first query should affect distribution by giving more for the next
*/
constexpr ui64 kCpuLimit = 10;
Expand All @@ -696,13 +696,13 @@ namespace {
.DelayParams = kDefaultDelayParams,
.UpdateFairSharePeriod = kDefaultUpdateFairSharePeriod
};

TComputeScheduler scheduler(options.Counters, options.DelayParams);
scheduler.SetTotalCpuLimit(kCpuLimit);

const TString databaseId = "db1";
scheduler.AddOrUpdateDatabase(databaseId, {});

const TString poolId = "pool1";
scheduler.AddOrUpdatePool(databaseId, poolId, {});

Expand Down Expand Up @@ -778,7 +778,7 @@ namespace {

const TString databaseId = "db1";
scheduler.AddOrUpdateDatabase(databaseId, {});

const TString poolId = "pool1";
scheduler.AddOrUpdatePool(databaseId, poolId, {});

Expand Down Expand Up @@ -807,7 +807,7 @@ namespace {
UNIT_ASSERT(databaseSnapshot);
UNIT_ASSERT_VALUES_EQUAL(databaseSnapshot->FairShare, kCpuLimit);

scheduler.RemoveQuery(queries[0]);
scheduler.RemoveQuery(std::get<NHdrf::TQueryId>(queries[0]->GetId()));
queries.erase(queries.begin());

scheduler.UpdateFairShare();
Expand Down Expand Up @@ -851,7 +851,7 @@ namespace {

const TString databaseId = "db1";
scheduler.AddOrUpdateDatabase(databaseId, {});

std::vector<TString> pools = {"pool1", "pool2", "pool3"};
for (size_t i = 0; i < pools.size(); ++i) {
scheduler.AddOrUpdatePool(databaseId, pools[i], {});
Expand Down Expand Up @@ -946,16 +946,16 @@ namespace {

const TString databaseId = "db1";
scheduler.AddOrUpdateDatabase(databaseId, {});

const TString poolId = "pool1";
scheduler.AddOrUpdatePool(databaseId, poolId, {});

NHdrf::TQueryId queryId = 1;
NHdrf::NDynamic::TQueryPtr query = scheduler.AddOrUpdateQuery(databaseId, poolId, queryId, {});

UNIT_ASSERT_NO_EXCEPTION(scheduler.RemoveQuery(query));
UNIT_ASSERT_EXCEPTION(scheduler.RemoveQuery(nullptr), yexception);
UNIT_ASSERT_EXCEPTION(scheduler.RemoveQuery(query), yexception);
UNIT_ASSERT_NO_EXCEPTION(scheduler.RemoveQuery(std::get<NHdrf::TQueryId>(query->GetId())));
UNIT_ASSERT_NO_EXCEPTION(scheduler.RemoveQuery(0));
UNIT_ASSERT_NO_EXCEPTION(scheduler.RemoveQuery(std::get<NHdrf::TQueryId>(query->GetId())));
UNIT_ASSERT_EXCEPTION(scheduler.AddOrUpdatePool("non-existent", poolId, {}), yexception);
UNIT_ASSERT_EXCEPTION(scheduler.AddOrUpdateQuery("non-existent", poolId, queryId, {}), yexception);
UNIT_ASSERT_EXCEPTION(scheduler.AddOrUpdateQuery(databaseId, "non-existent", queryId, {}), yexception);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/runtime/scheduler/kqp_schedulable_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct TSchedulableTask : public std::enable_shared_from_this<TSchedulableTask>
void IncreaseUsage();
void DecreaseUsage(const TDuration& burstUsage, bool forcedResume);

// Returns parent pool's 'fair-share' - 'usage'
// Returns parent pool's 'fair-share' minus 'usage'
size_t GetSpareUsage() const;

// Account extra usage which doesn't affect scheduling
Expand Down
1 change: 1 addition & 0 deletions ydb/library/services/services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ enum EServiceKikimr {
KQP_SESSION = 545;
KQP_COMPILE_COMPUTATION_PATTERN_SERVICE = 546;
KQP_WORKLOAD_SERVICE = 547;
KQP_COMPUTE_SCHEDULER = 548;

TABLET_RESOURCE_BROKER = 540;

Expand Down
Loading