@@ -325,7 +325,9 @@ impl Daemon {
325
325
}
326
326
Event :: CtrlC => {
327
327
for dataflow in self . running . values_mut ( ) {
328
- dataflow. stop_all ( & self . clock , None ) . await ;
328
+ dataflow
329
+ . stop_all ( & mut self . coordinator_connection , & self . clock , None )
330
+ . await ?;
329
331
}
330
332
}
331
333
}
@@ -496,7 +498,13 @@ impl Daemon {
496
498
. send ( Some ( reply) )
497
499
. map_err ( |_| error ! ( "could not send stop reply from daemon to coordinator" ) ) ;
498
500
499
- dataflow. stop_all ( & self . clock , grace_duration) . await ;
501
+ dataflow
502
+ . stop_all (
503
+ & mut self . coordinator_connection ,
504
+ & self . clock ,
505
+ grace_duration,
506
+ )
507
+ . await ?;
500
508
RunStatus :: Continue
501
509
}
502
510
DaemonCoordinatorEvent :: Destroy => {
@@ -640,6 +648,10 @@ impl Daemon {
640
648
if local {
641
649
dataflow. pending_nodes . insert ( node. id . clone ( ) ) ;
642
650
651
+ if node. kind . dynamic ( ) {
652
+ dataflow. dynamic_nodes . insert ( node. id . clone ( ) ) ;
653
+ }
654
+
643
655
let node_id = node. id . clone ( ) ;
644
656
let node_stderr_most_recent = dataflow
645
657
. node_stderr_most_recent
@@ -1464,6 +1476,12 @@ pub struct RunningDataflow {
1464
1476
open_inputs : BTreeMap < NodeId , BTreeSet < DataId > > ,
1465
1477
running_nodes : BTreeMap < NodeId , RunningNode > ,
1466
1478
1479
+ /// List of all dynamic node IDs.
1480
+ ///
1481
+ /// We want to treat dynamic nodes differently in some cases, so we need
1482
+ /// to know which nodes are dynamic.
1483
+ dynamic_nodes : BTreeSet < NodeId > ,
1484
+
1467
1485
open_external_mappings : HashMap < OutputId , BTreeMap < String , BTreeSet < InputId > > > ,
1468
1486
1469
1487
pending_drop_tokens : HashMap < DropToken , DropTokenInformation > ,
@@ -1495,6 +1513,7 @@ impl RunningDataflow {
1495
1513
timers : BTreeMap :: new ( ) ,
1496
1514
open_inputs : BTreeMap :: new ( ) ,
1497
1515
running_nodes : BTreeMap :: new ( ) ,
1516
+ dynamic_nodes : BTreeSet :: new ( ) ,
1498
1517
open_external_mappings : HashMap :: new ( ) ,
1499
1518
pending_drop_tokens : HashMap :: new ( ) ,
1500
1519
_timer_handles : Vec :: new ( ) ,
@@ -1559,7 +1578,21 @@ impl RunningDataflow {
1559
1578
Ok ( ( ) )
1560
1579
}
1561
1580
1562
- async fn stop_all ( & mut self , clock : & HLC , grace_duration : Option < Duration > ) {
1581
+ async fn stop_all (
1582
+ & mut self ,
1583
+ coordinator_connection : & mut Option < TcpStream > ,
1584
+ clock : & HLC ,
1585
+ grace_duration : Option < Duration > ,
1586
+ ) -> eyre:: Result < ( ) > {
1587
+ self . pending_nodes
1588
+ . handle_dataflow_stop (
1589
+ coordinator_connection,
1590
+ clock,
1591
+ & mut self . cascading_error_causes ,
1592
+ & self . dynamic_nodes ,
1593
+ )
1594
+ . await ?;
1595
+
1563
1596
for ( _node_id, channel) in self . subscribe_channels . drain ( ) {
1564
1597
let _ = send_with_timestamp ( & channel, daemon_messages:: NodeEvent :: Stop , clock) ;
1565
1598
}
@@ -1586,6 +1619,7 @@ impl RunningDataflow {
1586
1619
}
1587
1620
} ) ;
1588
1621
self . stop_sent = true ;
1622
+ Ok ( ( ) )
1589
1623
}
1590
1624
1591
1625
fn open_inputs ( & self , node_id : & NodeId ) -> & BTreeSet < DataId > {
0 commit comments