Skip to content

Commit 0ed89e2

Browse files
authored
RUST-242 Properly handle network errors during operations (#119)
1 parent fbdc830 commit 0ed89e2

File tree

4 files changed

+84
-37
lines changed

4 files changed

+84
-37
lines changed

src/client/executor.rs

Lines changed: 17 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
error::Result,
1212
event::command::{CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent},
1313
operation::Operation,
14-
sdam::{update_topology, ServerDescription},
14+
sdam::{handle_post_handshake_error, handle_pre_handshake_error},
1515
};
1616

1717
lazy_static! {
@@ -58,43 +58,26 @@ impl Client {
5858
Some(conn) => self.execute_operation_on_connection(op, conn),
5959
None => {
6060
let server = self.select_server(op.selection_criteria())?;
61-
let mut conn = server.checkout_connection()?;
6261

63-
let result = self.execute_operation_on_connection(op, &mut conn);
64-
65-
// If we encounter certain errors, we must update the topology as per the
66-
// SDAM spec.
67-
if let Err(ref e) = result {
68-
let update = || {
69-
let description =
70-
ServerDescription::new(conn.address().clone(), Some(Err(e.clone())));
71-
update_topology(self.topology(), description);
72-
};
62+
let mut conn = match server.checkout_connection() {
63+
Ok(conn) => conn,
64+
Err(err) => {
65+
handle_pre_handshake_error(
66+
err.clone(),
67+
server.address.clone(),
68+
self.topology(),
69+
);
70+
return Err(err);
71+
}
72+
};
7373

74-
if e.is_non_timeout_network_error() {
75-
update();
76-
} else if e.is_recovering() || e.is_not_master() {
77-
update();
78-
79-
// For "node is recovering" or "not master" errors, we must request a
80-
// topology check.
81-
server.request_topology_check();
82-
83-
let wire_version = conn
84-
.stream_description()
85-
.map(|sd| sd.max_wire_version)
86-
.ok()
87-
.and_then(std::convert::identity)
88-
.unwrap_or(0);
89-
90-
// in 4.2+, we only clear connection pool if we've received a
91-
// "node is shutting down" error. Otherwise, we always clear the pool.
92-
if wire_version < 8 || e.is_shutting_down() {
93-
server.clear_connection_pool();
94-
}
74+
match self.execute_operation_on_connection(op, &mut conn) {
75+
Ok(result) => Ok(result),
76+
Err(err) => {
77+
handle_post_handshake_error(err.clone(), conn, server, self.topology());
78+
Err(err)
9579
}
9680
}
97-
result
9881
}
9982
}
10083
}

src/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,13 @@ impl ErrorKind {
189189
}
190190
}
191191

192+
pub(crate) fn is_network_error(&self) -> bool {
193+
match self {
194+
ErrorKind::Io(..) => true,
195+
_ => false,
196+
}
197+
}
198+
192199
/// Gets the code/message tuple from this error, if applicable. In the case of write errors, the
193200
/// code and message are taken from the write concern error, if there is one.
194201
fn code_and_message(&self) -> Option<(i32, &str)> {

src/sdam/mod.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,17 @@ mod state;
55

66
pub use self::public::{ServerInfo, ServerType};
77

8+
#[cfg(test)]
9+
pub(crate) use self::description::server::ServerDescription;
810
pub(crate) use self::{
9-
description::{server::ServerDescription, topology::TopologyDescription},
11+
description::topology::TopologyDescription,
1012
monitor::MIN_HEARTBEAT_FREQUENCY,
11-
state::{server::Server, update_topology, Topology, TopologyUpdateCondvar},
13+
state::{
14+
handle_post_handshake_error,
15+
handle_pre_handshake_error,
16+
server::Server,
17+
update_topology,
18+
Topology,
19+
TopologyUpdateCondvar,
20+
},
1221
};

src/sdam/state/mod.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use self::server::Server;
1212
use super::TopologyDescription;
1313
use crate::{
1414
cmap::{Command, Connection},
15-
error::Result,
15+
error::{Error, Result},
1616
options::{ClientOptions, StreamAddress},
1717
sdam::{
1818
description::server::{ServerDescription, ServerType},
@@ -175,6 +175,54 @@ impl Topology {
175175
}
176176
}
177177

178+
pub(crate) fn handle_pre_handshake_error(
179+
error: Error,
180+
address: StreamAddress,
181+
topology: Arc<RwLock<Topology>>,
182+
) {
183+
if error.is_network_error() {
184+
mark_server_as_unknown(error, address, topology);
185+
}
186+
}
187+
188+
pub(crate) fn handle_post_handshake_error(
189+
error: Error,
190+
conn: Connection,
191+
server: Arc<Server>,
192+
topology: Arc<RwLock<Topology>>,
193+
) {
194+
// If we encounter certain errors, we must update the topology as per the
195+
// SDAM spec.
196+
if error.is_non_timeout_network_error() {
197+
mark_server_as_unknown(error, server.address.clone(), topology);
198+
server.clear_connection_pool();
199+
} else if error.is_recovering() || error.is_not_master() {
200+
mark_server_as_unknown(error.clone(), server.address.clone(), topology);
201+
202+
// For "node is recovering" or "not master" errors, we must request a
203+
// topology check.
204+
server.request_topology_check();
205+
206+
let wire_version = conn
207+
.stream_description()
208+
.map(|sd| sd.max_wire_version)
209+
.ok()
210+
.and_then(std::convert::identity)
211+
.unwrap_or(0);
212+
213+
// in 4.2+, we only clear connection pool if we've received a
214+
// "node is shutting down" error. Otherwise, we always clear the pool.
215+
if wire_version < 8 || error.is_shutting_down() {
216+
server.clear_connection_pool();
217+
}
218+
}
219+
}
220+
221+
fn mark_server_as_unknown(error: Error, address: StreamAddress, topology: Arc<RwLock<Topology>>) {
222+
let description = ServerDescription::new(address, Some(Err(error)));
223+
update_topology(topology, description);
224+
}
225+
178226
/// Updates the provided topology in a minimally contentious way by cloning first.
179227
pub(crate) fn update_topology(
180228
topology: Arc<RwLock<Topology>>,

0 commit comments

Comments
 (0)