Skip to content

Commit 9e7b871

Browse files
authored
RUST-1443 Stop executing monitors after their servers have been closed (#733)
1 parent 91bf9f4 commit 9e7b871

File tree

6 files changed

+287
-150
lines changed

6 files changed

+287
-150
lines changed

src/runtime/worker_handle.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use tokio::sync::mpsc;
1+
use tokio::sync::watch;
22

33
/// Handle to a worker. Once all handles have been dropped, the worker
44
/// will stop waiting for new requests.
55
#[derive(Debug, Clone)]
66
pub(crate) struct WorkerHandle {
7-
_sender: mpsc::Sender<()>,
7+
_receiver: watch::Receiver<()>,
88
}
99

1010
impl WorkerHandle {
@@ -18,24 +18,31 @@ impl WorkerHandle {
1818
/// Listener used to determine when all handles have been dropped.
1919
#[derive(Debug)]
2020
pub(crate) struct WorkerHandleListener {
21-
receiver: mpsc::Receiver<()>,
21+
sender: watch::Sender<()>,
2222
}
2323

2424
impl WorkerHandleListener {
2525
/// Listen until all handles are dropped.
2626
/// This will not return until all handles are dropped, so make sure to only poll this via
2727
/// select or with a timeout.
28-
pub(crate) async fn wait_for_all_handle_drops(&mut self) {
29-
self.receiver.recv().await;
28+
pub(crate) async fn wait_for_all_handle_drops(&self) {
29+
self.sender.closed().await
30+
}
31+
32+
/// Returns whether there are handles still alive.
33+
pub(crate) fn is_alive(&self) -> bool {
34+
!self.sender.is_closed()
3035
}
3136

3237
/// Constructs a new channel for for monitoring whether this worker still has references
3338
/// to it.
3439
pub(crate) fn channel() -> (WorkerHandle, WorkerHandleListener) {
35-
let (sender, receiver) = mpsc::channel(1);
40+
let (sender, receiver) = watch::channel(());
3641
(
37-
WorkerHandle { _sender: sender },
38-
WorkerHandleListener { receiver },
42+
WorkerHandle {
43+
_receiver: receiver,
44+
},
45+
WorkerHandleListener { sender },
3946
)
4047
}
4148
}

src/sdam/description/topology/mod.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl PartialEq for TopologyDescription {
126126
}
127127

128128
impl TopologyDescription {
129-
pub(crate) fn new(options: ClientOptions) -> crate::error::Result<Self> {
129+
pub(crate) fn initialized(options: &ClientOptions) -> crate::error::Result<Self> {
130130
verify_max_staleness(
131131
options
132132
.selection_criteria
@@ -146,8 +146,13 @@ impl TopologyDescription {
146146

147147
let servers: HashMap<_, _> = options
148148
.hosts
149-
.into_iter()
150-
.map(|address| (address.clone(), ServerDescription::new(address, None)))
149+
.iter()
150+
.map(|address| {
151+
(
152+
address.clone(),
153+
ServerDescription::new(address.clone(), None),
154+
)
155+
})
151156
.collect();
152157

153158
let session_support_status = if topology_type == TopologyType::LoadBalanced {
@@ -164,9 +169,9 @@ impl TopologyDescription {
164169
};
165170

166171
Ok(Self {
167-
single_seed: servers.len() == 1,
172+
single_seed: options.hosts.len() == 1,
168173
topology_type,
169-
set_name: options.repl_set_name,
174+
set_name: options.repl_set_name.clone(),
170175
max_set_version: None,
171176
max_election_id: None,
172177
compatibility_error: None,
@@ -435,11 +440,13 @@ impl TopologyDescription {
435440
_ => None,
436441
});
437442

438-
Some(TopologyDescriptionDiff {
443+
let diff = TopologyDescriptionDiff {
439444
removed_addresses: addresses.difference(&other_addresses).cloned().collect(),
440445
added_addresses: other_addresses.difference(&addresses).cloned().collect(),
441446
changed_servers: changed_servers.collect(),
442-
})
447+
};
448+
449+
Some(diff)
443450
}
444451

445452
/// Syncs the set of servers in the description to those in `hosts`. Servers in the set not

src/sdam/monitor.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::{
2020
},
2121
hello::{hello_command, run_hello, HelloReply},
2222
options::{ClientOptions, ServerAddress},
23-
runtime,
23+
runtime::{self, WorkerHandle, WorkerHandleListener},
2424
};
2525

2626
pub(crate) const DEFAULT_HEARTBEAT_FREQUENCY: Duration = Duration::from_secs(10);
@@ -36,6 +36,7 @@ pub(crate) struct Monitor {
3636
sdam_event_emitter: Option<SdamEventEmitter>,
3737
update_request_receiver: TopologyCheckRequestReceiver,
3838
client_options: ClientOptions,
39+
server_handle_listener: WorkerHandleListener,
3940
}
4041

4142
impl Monitor {
@@ -46,8 +47,9 @@ impl Monitor {
4647
sdam_event_emitter: Option<SdamEventEmitter>,
4748
update_request_receiver: TopologyCheckRequestReceiver,
4849
client_options: ClientOptions,
49-
) {
50+
) -> WorkerHandle {
5051
let handshaker = Handshaker::new(Some(client_options.clone().into()));
52+
let (handle, server_handle_listener) = WorkerHandleListener::channel();
5153
let monitor = Self {
5254
address,
5355
client_options,
@@ -57,8 +59,14 @@ impl Monitor {
5759
sdam_event_emitter,
5860
update_request_receiver,
5961
connection: None,
62+
server_handle_listener,
6063
};
61-
runtime::execute(monitor.execute())
64+
runtime::execute(monitor.execute());
65+
handle
66+
}
67+
68+
fn is_alive(&self) -> bool {
69+
self.server_handle_listener.is_alive()
6270
}
6371

6472
async fn execute(mut self) {
@@ -67,7 +75,7 @@ impl Monitor {
6775
.heartbeat_freq
6876
.unwrap_or(DEFAULT_HEARTBEAT_FREQUENCY);
6977

70-
while self.topology_watcher.is_alive() {
78+
while self.is_alive() {
7179
self.check_server().await;
7280

7381
#[cfg(test)]
@@ -81,10 +89,14 @@ impl Monitor {
8189
#[cfg(not(test))]
8290
let min_frequency = MIN_HEARTBEAT_FREQUENCY;
8391

84-
runtime::delay_for(min_frequency).await;
85-
self.update_request_receiver
86-
.wait_for_check_request(heartbeat_frequency - min_frequency)
87-
.await;
92+
if self.is_alive() {
93+
runtime::delay_for(min_frequency).await;
94+
}
95+
if self.is_alive() {
96+
self.update_request_receiver
97+
.wait_for_check_request(heartbeat_frequency - min_frequency)
98+
.await;
99+
}
88100
}
89101
}
90102

src/sdam/test.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
collections::HashSet,
23
sync::Arc,
34
time::{Duration, Instant},
45
};
@@ -8,7 +9,10 @@ use semver::VersionReq;
89
use tokio::sync::{RwLockReadGuard, RwLockWriteGuard};
910

1011
use crate::{
12+
client::options::{ClientOptions, ServerAddress},
13+
cmap::RawCommandResponse,
1114
error::{Error, ErrorKind},
15+
event::sdam::SdamEventHandler,
1216
hello::{LEGACY_HELLO_COMMAND_NAME, LEGACY_HELLO_COMMAND_NAME_LOWERCASE},
1317
runtime,
1418
test::{
@@ -28,6 +32,8 @@ use crate::{
2832
Client,
2933
};
3034

35+
use super::{ServerDescription, Topology};
36+
3137
#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))]
3238
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
3339
async fn min_heartbeat_frequency() {
@@ -485,3 +491,90 @@ async fn repl_set_name_mismatch() -> crate::error::Result<()> {
485491

486492
Ok(())
487493
}
494+
495+
/// Test verifying that a server's monitor stops after the server has been removed from the
496+
/// topology.
497+
#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))]
498+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
499+
async fn removed_server_monitor_stops() -> crate::error::Result<()> {
500+
let _guard = LOCK.run_concurrently().await;
501+
502+
let handler = Arc::new(EventHandler::new());
503+
let options = ClientOptions::builder()
504+
.hosts(vec![
505+
ServerAddress::parse("localhost:49152")?,
506+
ServerAddress::parse("localhost:49153")?,
507+
ServerAddress::parse("localhost:49154")?,
508+
])
509+
.heartbeat_freq(Duration::from_millis(50))
510+
.sdam_event_handler(handler.clone() as Arc<dyn SdamEventHandler>)
511+
.repl_set_name("foo".to_string())
512+
.build();
513+
514+
let hosts = options.hosts.clone();
515+
let set_name = options.repl_set_name.clone().unwrap();
516+
517+
let mut subscriber = handler.subscribe();
518+
let topology = Topology::new(options)?;
519+
520+
// Wait until all three monitors have started.
521+
let mut seen_monitors = HashSet::new();
522+
subscriber
523+
.wait_for_event(Duration::from_millis(500), |event| {
524+
if let Event::Sdam(SdamEvent::ServerHeartbeatStarted(e)) = event {
525+
seen_monitors.insert(e.server_address.clone());
526+
}
527+
seen_monitors.len() == hosts.len()
528+
})
529+
.await
530+
.expect("should see all three monitors start");
531+
532+
// Remove the third host from the topology.
533+
let hello = doc! {
534+
"ok": 1,
535+
"isWritablePrimary": true,
536+
"hosts": [
537+
hosts[0].clone().to_string(),
538+
hosts[1].clone().to_string(),
539+
],
540+
"me": hosts[0].clone().to_string(),
541+
"setName": set_name,
542+
"maxBsonObjectSize": 1234,
543+
"maxWriteBatchSize": 1234,
544+
"maxMessageSizeBytes": 1234,
545+
"minWireVersion": 0,
546+
"maxWireVersion": 13,
547+
};
548+
let hello_reply = Some(Ok(RawCommandResponse::with_document_and_address(
549+
hosts[0].clone(),
550+
hello,
551+
)
552+
.unwrap()
553+
.into_hello_reply(Duration::from_millis(10))
554+
.unwrap()));
555+
556+
topology
557+
.clone_updater()
558+
.update(ServerDescription::new(hosts[0].clone(), hello_reply))
559+
.await;
560+
561+
subscriber.wait_for_event(Duration::from_secs(1), |event| {
562+
matches!(event, Event::Sdam(SdamEvent::ServerClosed(e)) if e.address == hosts[2])
563+
}).await.expect("should see server closed event");
564+
565+
// Capture heartbeat events for 1 second. The monitor for the removed server should stop
566+
// publishing them.
567+
let events = subscriber.collect_events(Duration::from_secs(1), |event| {
568+
matches!(event, Event::Sdam(SdamEvent::ServerHeartbeatStarted(e)) if e.server_address == hosts[2])
569+
}).await;
570+
571+
// Use 3 to account for any heartbeats that happen to start between emitting the ServerClosed
572+
// event and actually publishing the state with the closed server.
573+
assert!(
574+
events.len() < 3,
575+
"expected monitor for removed server to stop performing checks, but saw {} heartbeats",
576+
events.len()
577+
);
578+
579+
Ok(())
580+
}

0 commit comments

Comments
 (0)