@@ -17,6 +17,7 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
1717 - Start query execution and receive TEvTxRequest.
1818 - When sending TEvAddQuery from executer to scheduler, immediately receive TEvAbortExecution.
1919 - Imitate receiving TEvQueryResponse before receiving self TEvPoison by executer.
20+ - Check that scheduler got TEvRemoveQuery.
2021 - Do not crash or get undefined behavior.
2122 */
2223 Y_UNIT_TEST (TestSuddenAbortAfterReady) {
@@ -27,27 +28,18 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
2728 auto db = kikimr.RunCall ([&] { return kikimr.GetTableClient (); } );
2829 auto session = kikimr.RunCall ([&] { return db.CreateSession ().GetValueSync ().GetSession (); } );
2930
30- auto prepareResult = kikimr.RunCall ([&] { return session.PrepareDataQuery (Q_ (R"(
31- SELECT COUNT(*) FROM `/Root/TwoShard`;
32- )" )).GetValueSync ();
33- });
34- UNIT_ASSERT_VALUES_EQUAL_C (prepareResult.GetStatus (), EStatus::SUCCESS, prepareResult.GetIssues ().ToString ());
35- auto dataQuery = prepareResult.GetQuery ();
36-
3731 TActorId executerId, targetId;
32+ ui8 queries = 0 ;
3833 auto & runtime = *kikimr.GetTestServer ().GetRuntime ();
3934 runtime.SetObserverFunc ([&](TAutoPtr<IEventHandle>& ev) {
40- {
41- TStringStream ss;
42- ss << " Got " << ev->GetTypeName () << " " << ev->Recipient << " " << ev->Sender << Endl;
43- Cerr << ss.Str ();
44- }
35+ Cerr << (TStringBuilder () << " Got " << ev->GetTypeName () << " " << ev->Recipient << " " << ev->Sender << Endl);
4536
4637 if (ev->GetTypeRewrite () == TEvKqpExecuter::TEvTxRequest::EventType) {
4738 targetId = ActorIdFromProto (ev->Get <TEvKqpExecuter::TEvTxRequest>()->Record .GetTarget ());
4839 }
4940
5041 if (ev->GetTypeRewrite () == NScheduler::TEvAddQuery::EventType) {
42+ ++queries;
5143 executerId = ev->Sender ;
5244 auto * abortExecution = new TEvKqp::TEvAbortExecution (NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues ());
5345 runtime.Send (new IEventHandle (ev->Sender , targetId, abortExecution));
@@ -60,11 +52,16 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
6052 return TTestActorRuntime::EEventAction::PROCESS;
6153 });
6254
63- auto future = kikimr.RunInThreadPool ([&] { return dataQuery.Execute (TTxControl::BeginTx ().CommitTx (), TExecDataQuerySettings ()).GetValueSync (); });
55+ auto future = kikimr.RunInThreadPool ([&] {
56+ return session.ExecuteDataQuery (" SELECT COUNT(*) FROM `/Root/TwoShard`;" , TTxControl::BeginTx ().CommitTx ()).ExtractValueSync ();
57+ });
6458
6559 TDispatchOptions opts;
6660 opts.FinalEvents .emplace_back ([&](IEventHandle& ev) {
67- return ev.GetTypeRewrite () == TEvKqpExecuter::TEvTxResponse::EventType;
61+ if (ev.GetTypeRewrite () == NScheduler::TEvRemoveQuery::EventType) {
62+ --queries;
63+ }
64+ return (ev.GetTypeRewrite () == TEvKqpExecuter::TEvTxResponse::EventType || ev.GetTypeRewrite () == NScheduler::TEvRemoveQuery::EventType) && !queries;
6865 });
6966 runtime.DispatchEvents (opts);
7067
@@ -141,4 +138,4 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {
141138 */
142139}
143140
144- } // namespace NKikimr
141+ } // namespace NKikimr::NKqp
0 commit comments