@@ -248,9 +248,6 @@ private class DataNodeRequestExecutor {
248
248
}
249
249
250
250
void start () {
251
- parentTask .addListener (
252
- () -> exchangeService .finishSinkHandler (request .sessionId (), new TaskCancelledException (parentTask .getReasonCancelled ()))
253
- );
254
251
runBatch (0 );
255
252
}
256
253
@@ -419,7 +416,12 @@ private void runComputeOnDataNode(
419
416
var parentListener = computeListener .acquireAvoid ();
420
417
try {
421
418
// run compute with target shards
419
+ var externalSink = exchangeService .getSinkHandler (externalId );
422
420
var internalSink = exchangeService .createSinkHandler (request .sessionId (), request .pragmas ().exchangeBufferSize ());
421
+ task .addListener (() -> {
422
+ exchangeService .finishSinkHandler (externalId , new TaskCancelledException (task .getReasonCancelled ()));
423
+ exchangeService .finishSinkHandler (request .sessionId (), new TaskCancelledException (task .getReasonCancelled ()));
424
+ });
423
425
DataNodeRequestExecutor dataNodeRequestExecutor = new DataNodeRequestExecutor (
424
426
request ,
425
427
task ,
@@ -431,10 +433,6 @@ private void runComputeOnDataNode(
431
433
);
432
434
dataNodeRequestExecutor .start ();
433
435
// run the node-level reduction
434
- var externalSink = exchangeService .getSinkHandler (externalId );
435
- task .addListener (
436
- () -> exchangeService .finishSinkHandler (externalId , new TaskCancelledException (task .getReasonCancelled ()))
437
- );
438
436
var exchangeSource = new ExchangeSourceHandler (1 , esqlExecutor );
439
437
exchangeSource .addRemoteSink (internalSink ::fetchPageAsync , true , () -> {}, 1 , ActionListener .noop ());
440
438
var reductionListener = computeListener .acquireCompute ();
0 commit comments