diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5d2ffbc4..d6336353 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,7 +8,10 @@ on: env: CARGO_TERM_COLOR: always - + TOXIPROXY_HOST: http://0.0.0.0 + TOXIPROXY_PORT: 8474 + MEMCACHED_HOST: 127.0.0.1 + MEMCACHED_PORT: 11211 jobs: lint: runs-on: ubuntu-latest @@ -35,10 +38,17 @@ jobs: - 11211:11211 steps: - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 + - name: Run toxiproxy service in main environment + run: | + wget -qO toxiproxy_2.9.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.9.0/toxiproxy_2.9.0_linux_amd64.deb + sudo dpkg -i toxiproxy_2.9.0.deb + toxiproxy-server & - name: Run parser and connection tests run: cargo test --all-features - name: Run integration tests - run: cargo test -- --test-threads=1 --ignored + run: cargo test --test integration_tests -- --test-threads=1 --ignored + - name: Run resiliency tests + run: cargo test --test resiliency_tests -- --test-threads=1 --ignored build: runs-on: ubuntu-latest steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index 3243cdb4..59de169b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate +### Added +- Added `Toxiproxy` resiliency testing. + ## [0.4.0] - 2024-09-20 ### Added diff --git a/Cargo.toml b/Cargo.toml index 9c2852fa..f41883e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ futures = "0.3" tokio = { version = "1.26", default-features = false, features = ["io-util"] } async-stream = "0.3" url = "2.5.2" +toxiproxy_rust = "0.1.6" fxhash = "0.2.1" [dev-dependencies] diff --git a/tests/resiliency_tests.rs b/tests/resiliency_tests.rs new file mode 100644 index 00000000..971b2dc8 --- /dev/null +++ b/tests/resiliency_tests.rs @@ -0,0 +1,322 @@ +use async_memcached::Client; + +use toxiproxy_rust::{ + client::Client as ToxiproxyClient, + proxy::{Proxy, ProxyPack}, +}; + +use std::net::{SocketAddr, ToSocketAddrs}; +use std::ops::Deref; +use std::sync::{atomic::AtomicUsize, Once, OnceLock}; + +static TOXIPROXY_INIT: Once = Once::new(); +static TOXI_ADDR: OnceLock = OnceLock::new(); +static PROXY_PORT: AtomicUsize = AtomicUsize::new(40000); + +struct ProxyDrop { + proxy: Proxy, +} + +impl Deref for ProxyDrop { + type Target = Proxy; + + fn deref(&self) -> &Self::Target { + &self.proxy + } +} + +impl Drop for ProxyDrop { + fn drop(&mut self) { + self.proxy.delete().unwrap(); + } +} + +fn create_proxy_and_config() -> (ProxyDrop, String) { + TOXIPROXY_INIT.call_once(|| { + let mut toxiproxy_host = match std::env::var_os("TOXIPROXY_HOST") { + Some(v) => v.into_string().unwrap(), + None => "http://127.0.0.1".to_string(), + }; + if let Some(stripped) = toxiproxy_host.strip_prefix("http://") { + toxiproxy_host = stripped.to_string(); + } + + let toxiproxy_port = match std::env::var_os("TOXIPROXY_PORT") { + Some(v) => v.into_string().unwrap(), + None => "8474".to_string(), + }; + + let toxiproxy_url = format!("{}:{}", toxiproxy_host, toxiproxy_port); + + // Create toxiproxy client and populate proxies + let toxi_addr = toxiproxy_url.to_socket_addrs().unwrap().next().unwrap(); + + let toxiproxy_client = ToxiproxyClient::new(toxi_addr); + toxiproxy_client + .all() + .unwrap() + .iter() + .for_each(|(_, proxy)| proxy.delete().unwrap()); + + TOXI_ADDR.get_or_init(|| toxi_addr); + }); + + let mut local_host = match std::env::var_os("MEMCACHED_HOST") { + Some(v) => v.into_string().unwrap(), + None => "http://127.0.0.1".to_string(), // use IPV4 so that it resolves to a single Server + }; + if let Some(stripped) = local_host.strip_prefix("http://") { + local_host = stripped.to_string(); + } + + let local_memcached_port = match std::env::var_os("MEMCACHED_PORT") { + Some(v) => v.into_string().unwrap(), + None => "11211".to_string(), + }; + + let local_url = format!("{}:{}", local_host, local_memcached_port); + + let local_port = PROXY_PORT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + let toxi_addr = TOXI_ADDR.get().unwrap(); + let toxic_local_addr = format!("{}:{}", toxi_addr.ip(), local_port); + + let proxy = ProxyPack::new( + format!("local-memcached-{}", local_port), + toxic_local_addr.clone(), + local_url.clone(), + ); + + let toxiproxy_client = ToxiproxyClient::new(toxi_addr); + assert!(toxiproxy_client.is_running()); + + let proxy = toxiproxy_client.populate(vec![proxy]).unwrap(); + let proxy = proxy + .into_iter() + .map(|proxy| ProxyDrop { proxy }) + .next() + .unwrap(); + + (proxy, toxic_local_addr) +} + +async fn setup_clean_client() -> Client { + let mut local_host = match std::env::var_os("MEMCACHED_HOST") { + Some(v) => v.into_string().unwrap(), + None => "http://127.0.0.1".to_string(), + }; + if let Some(stripped) = local_host.strip_prefix("http://") { + local_host = stripped.to_string(); + } + + let local_memcached_port = match std::env::var_os("MEMCACHED_PORT") { + Some(v) => v.into_string().unwrap(), + None => "11211".to_string(), + }; + + Client::new(format!("tcp://{}:{}", local_host, local_memcached_port,)) + .await + .unwrap() +} + +async fn setup_toxic_client(toxic_local_url: &String) -> Client { + Client::new(toxic_local_url).await.unwrap() +} + +fn setup_runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() +} + +async fn clear_keys(client: &mut Client, keys: &[&str]) { + for key in keys { + let _ = client.delete(key).await; + let result = client.get(key).await; + assert_eq!(result, Ok(None)); + } +} + +fn setup_runtime_and_clients( + toxic_local_url: &String, + keys: &Vec<&str>, +) -> (tokio::runtime::Runtime, Client, Client) { + let rt = setup_runtime(); + let mut clean_client = rt.block_on(setup_clean_client()); + let toxic_client = rt.block_on(setup_toxic_client(toxic_local_url)); + + rt.block_on(clear_keys(&mut clean_client, &keys)); + + (rt, clean_client, toxic_client) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[ignore = "Relies on a running memcached server and toxiproxy service"] + #[test] + fn test_set_multi_succeeds_with_clean_client() { + let rt = setup_runtime(); + + let keys = vec!["clean-key1", "clean-key2", "clean-key3"]; + let values = vec!["value1", "value2", "value3"]; + let kv: Vec<(&str, &str)> = keys.clone().into_iter().zip(values).collect(); + + let mut clean_client = rt.block_on(setup_clean_client()); + + rt.block_on(clear_keys(&mut clean_client, &keys)); + + let result = rt.block_on(async { clean_client.set_multi(&kv, None, None).await }); + + assert!(result.is_ok()); + } + + #[ignore = "Relies on a running memcached server and toxiproxy service"] + #[test] + fn test_set_multi_errors_with_toxic_client_via_with_down() { + let keys = vec!["with-down-key1", "with-down-key2", "with-down-key3"]; + let values = vec!["value1", "value2", "value3"]; + + let (toxic_proxy, toxic_local_addr) = create_proxy_and_config(); + let toxic_local_url = "tcp://".to_string() + &toxic_local_addr; + + let (rt, _, mut toxic_client) = setup_runtime_and_clients(&toxic_local_url, &keys); + + let kv: Vec<(&str, &str)> = keys.clone().into_iter().zip(values).collect(); + + let _ = toxic_proxy.with_down(|| { + rt.block_on(async { + let result = toxic_client.set_multi(&kv, None, None).await; + assert_eq!( + result, + Err(async_memcached::Error::Io( + std::io::ErrorKind::UnexpectedEof.into() + )) + ); + }); + }); + } + + #[ignore = "Relies on a running memcached server and toxiproxy service"] + #[test] + fn test_set_multi_errors_on_upstream_with_toxic_client_via_limit_data() { + let keys = vec!["upstream-key1", "upstream-key2", "upstream-key3"]; + let values = vec!["value1", "value2", "value3"]; + + let (toxic_proxy, toxic_local_addr) = create_proxy_and_config(); + let toxic_local_url = "tcp://".to_string() + &toxic_local_addr; + + let (rt, mut clean_client, mut toxic_client) = + setup_runtime_and_clients(&toxic_local_url, &keys); + + let multiset_command = + keys.iter() + .zip(values.iter()) + .fold(String::new(), |mut acc, (key, value)| { + acc.push_str(&format!("set {} 0 0 {}\r\n{}\r\n", key, value.len(), value)); + acc + }); + + // Simulate a network error happening when the client makes a request to the server. Only part of the request is received by the server. + // In this case, the server can only cache values for the keys with complete commands. + + let byte_limit = multiset_command.len() - 10; // First two commands should be intact, last one cut off + + let _ = toxic_proxy + .with_limit_data("upstream".into(), byte_limit as u32, 1.0) + .apply(|| { + rt.block_on(async { + let kv: Vec<(&str, &str)> = + keys.clone().into_iter().zip(values.clone()).collect(); + let result = toxic_client.set_multi(&kv, None, None).await; + + assert_eq!( + result, + Err(async_memcached::Error::Io( + std::io::ErrorKind::UnexpectedEof.into() + )) + ); + }); + }); + + // Use a clean client to check that the first two keys were stored and last was not + let get_result = rt.block_on(async { clean_client.get("upstream-key1").await }); + assert!(matches!( + std::str::from_utf8( + &get_result + .expect("should have unwrapped a Result") + .expect("should have unwrapped an Option") + .data + ) + .expect("failed to parse string from bytes"), + "value1" + )); + + let get_result = rt.block_on(async { clean_client.get("upstream-key2").await }); + assert!(matches!( + std::str::from_utf8( + &get_result + .expect("should have unwrapped a Result") + .expect("should have unwrapped an Option") + .data + ) + .expect("failed to parse string from bytes"), + "value2" + )); + + let get_result = rt.block_on(async { clean_client.get("upstream-key3").await }); + assert_eq!(get_result, Ok(None)); + } + + #[ignore = "Relies on a running memcached server and toxiproxy service"] + #[test] + fn test_set_multi_errors_on_downstream_with_toxic_client_via_limit_data() { + let keys = vec!["downstream-key1", "downstream-key2", "downstream-key3"]; + let values = vec!["value1", "value2", "value3"]; + + let (toxic_proxy, toxic_local_addr) = create_proxy_and_config(); + let toxic_local_url = "tcp://".to_string() + &toxic_local_addr; + + let (rt, mut clean_client, mut toxic_client) = + setup_runtime_and_clients(&toxic_local_url, &keys); + + // Simulate a network error happening when the server responds back to the client. A complete response is received for the first key but then + // the connection is closed before the other responses are received. Regardless, the server should still cache all the data. + let byte_limit = "STORED\r\n".as_bytes().len() + 1; + + let _ = toxic_proxy + .with_limit_data("downstream".into(), byte_limit as u32, 1.0) + .apply(|| { + rt.block_on(async { + let kv: Vec<(&str, &str)> = + keys.clone().into_iter().zip(values.clone()).collect(); + + let set_result = toxic_client.set_multi(&kv, None, None).await; + + assert_eq!( + set_result, + Err(async_memcached::Error::Io( + std::io::ErrorKind::UnexpectedEof.into() + )) + ); + }); + }); + + // Use a clean client to check that all values were cached by the server despite the interrupted server response. + for (key, _expected_value) in keys.iter().zip(values.iter()) { + let get_result = rt.block_on(async { clean_client.get(key).await }); + assert!(matches!( + std::str::from_utf8( + &get_result + .expect("should have unwrapped a Result") + .expect("should have unwrapped an Option") + .data + ) + .expect("failed to parse string from bytes"), + _expected_value + )); + } + } +}