Skip to content

Commit f22f59b

Browse files
committed
Cleaning up the shutdown fix for issue bitcoin-dev-project#222
1 parent db0d1c6 commit f22f59b

File tree

2 files changed

+82
-90
lines changed

2 files changed

+82
-90
lines changed

sim-cli/src/main.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,7 @@ async fn main() -> anyhow::Result<()> {
224224

225225
ctrlc::set_handler(move || {
226226
log::info!("Shutting down simulation.");
227-
// Abort or shutdown??
228-
// sim2.abort()
229-
sim2.shutdown();
227+
sim2.abort();
230228
})?;
231229

232230
sim.run().await?;

simln-lib/src/lib.rs

Lines changed: 81 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ pub struct Simulation {
522522
shutdown_trigger: Trigger,
523523
shutdown_listener: Listener,
524524
abort_trigger: Trigger,
525-
abort_listener: Listener
525+
abort_listener: Listener,
526526
}
527527

528528
#[derive(Clone)]
@@ -559,7 +559,7 @@ impl Simulation {
559559
shutdown_trigger,
560560
shutdown_listener,
561561
abort_trigger,
562-
abort_listener
562+
abort_listener,
563563
}
564564
}
565565

@@ -661,94 +661,94 @@ impl Simulation {
661661
);
662662
let mut tasks = JoinSet::new();
663663

664-
{ // A block to control the scope of consumer_channels and event_sender. These need to go out of scope so that receivers will close.
665-
666-
// Before we start the simulation up, start tasks that will be responsible for gathering simulation data.
667-
// The event channels are shared across our functionality:
668-
// - Event Sender: used by the simulation to inform data reporting that it needs to start tracking the
669-
// final result of the event that it has taken.
670-
// - Event Receiver: used by data reporting to receive events that have been simulated that need to be
671-
// tracked and recorded.
672-
let (event_sender, event_receiver) = channel(1);
673-
self.run_data_collection(event_receiver, &mut tasks);
674-
675-
// Get an execution kit per activity that we need to generate and spin up consumers for each source node.
676-
let activities = match self.activity_executors().await {
677-
Ok(a) => a,
678-
Err(e) => {
679-
// If we encounter an error while setting up the activity_executors,
680-
// we need to shutdown and wait for tasks to finish. We have started background tasks in the
681-
// run_data_collection function, so we should shut those down before returning.
682-
// The tasks started in run_data_collection are listening for the abort trigger.
683-
self.abort_trigger.trigger();
684-
while let Some(res) = tasks.join_next().await {
685-
if let Err(e) = res {
686-
log::error!("Task exited with error: {e}.");
687-
}
688-
}
689-
return Err(e);
690-
},
691-
};
692-
let consumer_channels = self.dispatch_consumers(
693-
activities
694-
.iter()
695-
.map(|generator| generator.source_info.pubkey)
696-
.collect(),
697-
event_sender.clone(),
698-
&mut tasks,
699-
);
700-
701-
// Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
702-
// The producers will use their own JoinSet so that the simulation can be shutdown if they all finish.
703-
let mut producer_tasks = JoinSet::new();
704-
match self
705-
.dispatch_producers(activities, consumer_channels, &mut producer_tasks)
706-
.await
707664
{
708-
Ok(_) => {},
709-
Err(e) => {
710-
// If we encounter an error in dispatch_producers, we need to shutdown and wait for tasks to finish.
711-
// We have started background tasks in the run_data_collection function,
712-
// so we should shut those down before returning.
713-
self.shutdown();
714-
while let Some(res) = tasks.join_next().await {
715-
if let Err(e) = res {
716-
log::error!("Task exited with error: {e}.");
665+
// A block to control the scope of consumer_channels and event_sender. These need to go out of scope so that receivers will close.
666+
667+
// Before we start the simulation up, start tasks that will be responsible for gathering simulation data.
668+
// The event channels are shared across our functionality:
669+
// - Event Sender: used by the simulation to inform data reporting that it needs to start tracking the
670+
// final result of the event that it has taken.
671+
// - Event Receiver: used by data reporting to receive events that have been simulated that need to be
672+
// tracked and recorded.
673+
let (event_sender, event_receiver) = channel(1);
674+
self.run_data_collection(event_receiver, &mut tasks);
675+
676+
// Get an execution kit per activity that we need to generate and spin up consumers for each source node.
677+
let activities = match self.activity_executors().await {
678+
Ok(a) => a,
679+
Err(e) => {
680+
// If we encounter an error while setting up the activity_executors,
681+
// we need to shutdown and wait for tasks to finish. We have started background tasks in the
682+
// run_data_collection function, so we should shut those down before returning.
683+
// The tasks started in run_data_collection are listening for the abort trigger.
684+
self.abort_trigger.trigger();
685+
while let Some(res) = tasks.join_next().await {
686+
if let Err(e) = res {
687+
log::error!("Task exited with error: {e}.");
688+
}
717689
}
718-
}
719-
return Err(e);
720-
},
721-
}
690+
return Err(e);
691+
},
692+
};
693+
let consumer_channels = self.dispatch_consumers(
694+
activities
695+
.iter()
696+
.map(|generator| generator.source_info.pubkey)
697+
.collect(),
698+
event_sender.clone(),
699+
&mut tasks,
700+
);
722701

723-
// Start a task that waits for the producers to finish.
724-
// If all producers finish, then there is nothing left to do and the simulation can be shutdown.
725-
let producer_trigger = self.shutdown_trigger.clone();
726-
tasks.spawn(async move {
727-
while let Some(res) = producer_tasks.join_next().await {
728-
if let Err(e) = res {
729-
log::error!("Producer exited with error: {e}.");
730-
}
702+
// Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
703+
// The producers will use their own JoinSet so that the simulation can be shutdown if they all finish.
704+
let mut producer_tasks = JoinSet::new();
705+
match self
706+
.dispatch_producers(activities, consumer_channels, &mut producer_tasks)
707+
.await
708+
{
709+
Ok(_) => {},
710+
Err(e) => {
711+
// If we encounter an error in dispatch_producers, we need to shutdown and wait for tasks to finish.
712+
// We have started background tasks in the run_data_collection function,
713+
// so we should shut those down before returning.
714+
self.shutdown();
715+
while let Some(res) = tasks.join_next().await {
716+
if let Err(e) = res {
717+
log::error!("Task exited with error: {e}.");
718+
}
719+
}
720+
return Err(e);
721+
},
731722
}
732-
log::info!("All producers finished. Shutting down.");
733-
producer_trigger.trigger()
734-
});
735-
736-
// Start a task that will shutdown the simulation if the total_time is met.
737-
if let Some(total_time) = self.cfg.total_time {
738-
let t = self.shutdown_trigger.clone();
739-
let l = self.shutdown_listener.clone();
740723

724+
// Start a task that waits for the producers to finish.
725+
// If all producers finish, then there is nothing left to do and the simulation can be shutdown.
726+
let producer_trigger = self.shutdown_trigger.clone();
741727
tasks.spawn(async move {
742-
if time::timeout(total_time, l).await.is_err() {
743-
log::info!(
744-
"Simulation run for {}s. Shutting down.",
745-
total_time.as_secs()
746-
);
747-
t.trigger()
728+
while let Some(res) = producer_tasks.join_next().await {
729+
if let Err(e) = res {
730+
log::error!("Producer exited with error: {e}.");
731+
}
748732
}
733+
log::info!("All producers finished. Shutting down.");
734+
producer_trigger.trigger()
749735
});
750-
}
751736

737+
// Start a task that will shutdown the simulation if the total_time is met.
738+
if let Some(total_time) = self.cfg.total_time {
739+
let t = self.shutdown_trigger.clone();
740+
let l = self.shutdown_listener.clone();
741+
742+
tasks.spawn(async move {
743+
if time::timeout(total_time, l).await.is_err() {
744+
log::info!(
745+
"Simulation run for {}s. Shutting down.",
746+
total_time.as_secs()
747+
);
748+
t.trigger()
749+
}
750+
});
751+
}
752752
} // A block to control the scope of consumer_channels and event_sender. These need to go out of scope so that receivers will close.
753753

754754
// We always want to wait for all threads to exit, so we wait for all of them to exit and track any errors
@@ -1056,7 +1056,6 @@ async fn consume_events(
10561056
) -> Result<(), SimulationError> {
10571057
loop {
10581058
select! {
1059-
// Listen for abort or always clean shutdown??
10601059
biased;
10611060
_ = listener.clone() => {
10621061
return Ok(());
@@ -1107,7 +1106,6 @@ async fn consume_events(
11071106
};
11081107

11091108
select!{
1110-
// Listen for abort or always clean shutdown??
11111109
biased;
11121110
_ = listener.clone() => {
11131111
return Ok(())
@@ -1254,7 +1252,6 @@ async fn consume_simulation_results(
12541252

12551253
loop {
12561254
select! {
1257-
// Listen for abort or always clean shutdown??
12581255
biased;
12591256
_ = listener.clone() => {
12601257
writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| {
@@ -1381,7 +1378,6 @@ async fn produce_simulation_results(
13811378

13821379
let result = loop {
13831380
tokio::select! {
1384-
// Listen for abort or always clean shutdown??
13851381
biased;
13861382
_ = listener.clone() => {
13871383
break Ok(())
@@ -1401,7 +1397,6 @@ async fn produce_simulation_results(
14011397
},
14021398
SimulationOutput::SendPaymentFailure(payment, result) => {
14031399
select!{
1404-
// Listen for abort or always clean shutdown??
14051400
_ = listener.clone() => {
14061401
return Ok(());
14071402
},
@@ -1473,7 +1468,6 @@ async fn track_payment_result(
14731468
};
14741469

14751470
select! {
1476-
// Listen for abort or always clean shutdown??
14771471
biased;
14781472
_ = listener.clone() => {
14791473
log::debug!("Track payment result received a shutdown signal.");

0 commit comments

Comments
 (0)