Skip to content

Commit 81e60d1

Browse files
committed
add toxiproxy crate and resiliency tests for async-memcached
1 parent 924d6dc commit 81e60d1

File tree

2 files changed

+310
-0
lines changed

2 files changed

+310
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ futures = "0.3"
2121
tokio = { version = "1.26", default-features = false, features = ["io-util"] }
2222
async-stream = "0.3"
2323
url = "2.5.2"
24+
toxiproxy_rust = "0.1.6"
2425
serial_test = "3.1.1"
2526
fxhash = "0.2.1"
2627

tests/resil.rs

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
use toxiproxy_rust::{
2+
client::Client as ToxiproxyClient,
3+
proxy::{Proxy, ProxyPack},
4+
};
5+
6+
use std::net::{SocketAddr, ToSocketAddrs};
7+
use std::ops::Deref;
8+
use std::sync::{atomic::AtomicUsize, Once, OnceLock};
9+
10+
static TOXIPROXY_INIT: Once = Once::new();
11+
static TOXI_ADDR: OnceLock<SocketAddr> = OnceLock::new();
12+
static PROXY_PORT: AtomicUsize = AtomicUsize::new(40000);
13+
14+
struct ProxyDrop {
15+
proxy: Proxy,
16+
}
17+
18+
impl Deref for ProxyDrop {
19+
type Target = Proxy;
20+
21+
fn deref(&self) -> &Self::Target {
22+
&self.proxy
23+
}
24+
}
25+
26+
impl Drop for ProxyDrop {
27+
fn drop(&mut self) {
28+
self.proxy.delete().unwrap();
29+
}
30+
}
31+
32+
fn create_proxies_and_configs() -> (Vec<ProxyDrop>, String) {
33+
TOXIPROXY_INIT.call_once(|| {
34+
let mut toxiproxy_url = match std::env::var_os("TOXIPROXY_URL") {
35+
Some(v) => v.into_string().unwrap(),
36+
None => "http://127.0.0.1:8474".to_string(),
37+
};
38+
39+
toxiproxy_url = toxiproxy_url.strip_prefix("http://").unwrap().to_string();
40+
41+
// Create toxiproxy client and populate proxies
42+
let toxi_addr = toxiproxy_url.to_socket_addrs().unwrap().next().unwrap();
43+
let toxiproxy_client = ToxiproxyClient::new(toxi_addr);
44+
toxiproxy_client
45+
.all()
46+
.unwrap()
47+
.iter()
48+
.for_each(|(_, proxy)| proxy.delete().unwrap());
49+
50+
TOXI_ADDR.get_or_init(|| toxi_addr);
51+
});
52+
53+
let local_url = match std::env::var_os("NODE_LOCAL_CACHE") {
54+
Some(v) => v.into_string().unwrap(),
55+
None => "127.0.0.1:11211".to_string(), // use IPV4 so that it resolves to a single Server
56+
};
57+
let local_port = PROXY_PORT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
58+
59+
let toxi_addr = TOXI_ADDR.get().unwrap();
60+
let toxic_local_addr = format!("{}:{}", toxi_addr.ip(), local_port);
61+
62+
let proxies = vec![ProxyPack::new(
63+
format!("local-memcached-{}", local_port),
64+
toxic_local_addr.clone(),
65+
local_url.clone(),
66+
)];
67+
68+
let toxiproxy_client = ToxiproxyClient::new(toxi_addr);
69+
assert!(toxiproxy_client.is_running());
70+
71+
let proxies = toxiproxy_client.populate(proxies).unwrap();
72+
let proxies = proxies
73+
.into_iter()
74+
.map(|proxy| ProxyDrop { proxy })
75+
.collect();
76+
77+
(proxies, toxic_local_addr)
78+
}
79+
80+
#[cfg(test)]
81+
mod tests {
82+
use super::*;
83+
84+
#[ignore = "Relies on a running memcached server and toxiproxy service"]
85+
#[test]
86+
fn test_set_multi_succeeds_with_clean_client() {
87+
let rt = tokio::runtime::Builder::new_multi_thread()
88+
.enable_all()
89+
.build()
90+
.unwrap();
91+
92+
let keys = vec!["clean-key1", "clean-key2", "clean-key3"];
93+
let values = vec!["value1", "value2", "value3"];
94+
let kv: Vec<(&str, &str)> = keys.clone().into_iter().zip(values).collect();
95+
96+
let mut clean_client = rt.block_on(async {
97+
async_memcached::Client::new("tcp://127.0.0.1:11211".to_string())
98+
.await
99+
.unwrap()
100+
});
101+
102+
for key in &keys {
103+
let _ = rt.block_on(async { clean_client.delete(key).await });
104+
let result = rt.block_on(async { clean_client.get(key).await });
105+
assert_eq!(result, Ok(None));
106+
}
107+
108+
let result = rt.block_on(async { clean_client.set_multi(kv, None, None).await });
109+
110+
assert!(result.is_ok());
111+
}
112+
113+
#[ignore = "Relies on a running memcached server and toxiproxy service"]
114+
#[test]
115+
fn test_set_multi_errors_with_toxic_client_via_with_down() {
116+
let rt = tokio::runtime::Builder::new_multi_thread()
117+
.enable_all()
118+
.build()
119+
.unwrap();
120+
121+
let (proxies, toxic_local_addr) = create_proxies_and_configs();
122+
123+
let toxic_local_url = "tcp://".to_string() + &toxic_local_addr;
124+
let toxic_proxy = &proxies[0];
125+
126+
let keys = vec!["with-down-key1", "with-down-key2", "with-down-key3"];
127+
let values = vec!["value1", "value2", "value3"];
128+
let kv: Vec<(&str, &str)> = keys.clone().into_iter().zip(values).collect();
129+
130+
let mut toxic_client =
131+
rt.block_on(async { async_memcached::Client::new(toxic_local_url).await.unwrap() });
132+
133+
for key in &keys {
134+
let _ = rt.block_on(async { toxic_client.delete(key).await });
135+
let result = rt.block_on(async { toxic_client.get(key).await });
136+
assert_eq!(result, Ok(None));
137+
}
138+
139+
let _ = toxic_proxy.with_down(|| {
140+
rt.block_on(async {
141+
let result = toxic_client.set_multi(kv, None, None).await;
142+
assert_eq!(
143+
result,
144+
Err(async_memcached::Error::Io(
145+
std::io::ErrorKind::UnexpectedEof.into()
146+
))
147+
);
148+
});
149+
});
150+
}
151+
152+
#[ignore = "Relies on a running memcached server and toxiproxy service"]
153+
#[test]
154+
fn test_set_multi_errors_on_upstream_with_toxic_client_via_limit_data() {
155+
let rt = tokio::runtime::Builder::new_multi_thread()
156+
.enable_all()
157+
.build()
158+
.unwrap();
159+
160+
let (proxies, toxic_local_addr) = create_proxies_and_configs();
161+
162+
let toxic_local_url = "tcp://".to_string() + &toxic_local_addr;
163+
let toxic_proxy = &proxies[0];
164+
165+
let keys = vec!["upstream-key1", "upstream-key2", "upstream-key3"];
166+
let values = vec!["value1", "value2", "value3"];
167+
168+
let multiset_command =
169+
keys.iter()
170+
.zip(values.iter())
171+
.fold(String::new(), |mut acc, (key, value)| {
172+
acc.push_str(&format!("set {} 0 0 {}\r\n{}\r\n", key, value.len(), value));
173+
acc
174+
});
175+
176+
let mut clean_client = rt.block_on(async {
177+
async_memcached::Client::new("tcp://127.0.0.1:11211".to_string())
178+
.await
179+
.unwrap()
180+
});
181+
182+
let mut toxic_client =
183+
rt.block_on(async { async_memcached::Client::new(toxic_local_url).await.unwrap() });
184+
185+
for key in &keys {
186+
let _ = rt.block_on(async { toxic_client.delete(key).await });
187+
let result = rt.block_on(async { toxic_client.get(key).await });
188+
assert_eq!(result, Ok(None));
189+
}
190+
191+
// Simulate a network error happening when the client makes a request to the server. Only part of the request is received by the server.
192+
// In this case, the server can only cache values for the keys with complete commands.
193+
194+
let byte_limit = multiset_command.len() - 10; // First two commands should be intact, last one cut off
195+
196+
let _ = toxic_proxy
197+
.with_limit_data("upstream".into(), byte_limit as u32, 1.0)
198+
.apply(|| {
199+
rt.block_on(async {
200+
let kv: Vec<(&str, &str)> =
201+
keys.clone().into_iter().zip(values.clone()).collect();
202+
let result = toxic_client.set_multi(kv.clone(), None, None).await;
203+
204+
assert_eq!(
205+
result,
206+
Err(async_memcached::Error::Io(
207+
std::io::ErrorKind::UnexpectedEof.into()
208+
))
209+
);
210+
});
211+
});
212+
213+
// Use a clean client to check that the first two keys were stored and last was not
214+
let get_result = rt.block_on(async { clean_client.get("upstream-key1").await });
215+
assert!(matches!(
216+
std::str::from_utf8(
217+
&get_result
218+
.expect("should have unwrapped a Result")
219+
.expect("should have unwrapped an Option")
220+
.data
221+
)
222+
.expect("failed to parse string from bytes"),
223+
"value1"
224+
));
225+
226+
let get_result = rt.block_on(async { clean_client.get("upstream-key2").await });
227+
assert!(matches!(
228+
std::str::from_utf8(
229+
&get_result
230+
.expect("should have unwrapped a Result")
231+
.expect("should have unwrapped an Option")
232+
.data
233+
)
234+
.expect("failed to parse string from bytes"),
235+
"value2"
236+
));
237+
238+
let get_result = rt.block_on(async { clean_client.get("upstream-key3").await });
239+
assert_eq!(get_result, Ok(None));
240+
}
241+
242+
#[ignore = "Relies on a running memcached server and toxiproxy service"]
243+
#[test]
244+
fn test_set_multi_errors_on_downstream_with_toxic_client_via_limit_data() {
245+
let rt = tokio::runtime::Builder::new_multi_thread()
246+
.enable_all()
247+
.build()
248+
.unwrap();
249+
250+
let (proxies, toxic_local_addr) = create_proxies_and_configs();
251+
252+
let toxic_local_url = "tcp://".to_string() + &toxic_local_addr;
253+
let toxic_proxy = &proxies[0];
254+
let keys = vec!["downstream-key1", "downstream-key2", "downstream-key3"];
255+
let values = vec!["value1", "value2", "value3"];
256+
257+
let mut clean_client = rt.block_on(async {
258+
async_memcached::Client::new("tcp://127.0.0.1:11211".to_string())
259+
.await
260+
.unwrap()
261+
});
262+
263+
let mut toxic_client =
264+
rt.block_on(async { async_memcached::Client::new(toxic_local_url).await.unwrap() });
265+
266+
for key in &keys {
267+
let _ = rt.block_on(async { toxic_client.delete(key).await });
268+
let result = rt.block_on(async { toxic_client.get(key).await });
269+
assert_eq!(result, Ok(None));
270+
}
271+
272+
// Simulate a network error happening when the server responds back to the client. A complete response is received for the first key but then
273+
// the connection is closed before the other responses are received. Regardless, the server should still cache all the data.
274+
let byte_limit = "STORED\r\n".as_bytes().len() + 1;
275+
276+
let _ = toxic_proxy
277+
.with_limit_data("downstream".into(), byte_limit as u32, 1.0)
278+
.apply(|| {
279+
rt.block_on(async {
280+
let kv: Vec<(&str, &str)> =
281+
keys.clone().into_iter().zip(values.clone()).collect();
282+
283+
let set_result = toxic_client.set_multi(kv.clone(), None, None).await;
284+
285+
assert_eq!(
286+
set_result,
287+
Err(async_memcached::Error::Io(
288+
std::io::ErrorKind::UnexpectedEof.into()
289+
))
290+
);
291+
});
292+
});
293+
294+
// Use a clean client to check that all values were cached by the server despite the interrupted server response.
295+
for (key, _expected_value) in keys.iter().zip(values.iter()) {
296+
let get_result = rt.block_on(async { clean_client.get(key).await });
297+
assert!(matches!(
298+
std::str::from_utf8(
299+
&get_result
300+
.expect("should have unwrapped a Result")
301+
.expect("should have unwrapped an Option")
302+
.data
303+
)
304+
.expect("failed to parse string from bytes"),
305+
_expected_value
306+
));
307+
}
308+
}
309+
}

0 commit comments

Comments
 (0)