Skip to content

Add toxiproxy resiliency testing #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ on:

env:
CARGO_TERM_COLOR: always

TOXIPROXY_HOST: http://0.0.0.0
TOXIPROXY_PORT: 8474
MEMCACHED_HOST: 127.0.0.1
MEMCACHED_PORT: 11211
jobs:
lint:
runs-on: ubuntu-latest
Expand All @@ -35,10 +38,17 @@ jobs:
- 11211:11211
steps:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- name: Run toxiproxy service in main environment
run: |
wget -qO toxiproxy_2.9.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.9.0/toxiproxy_2.9.0_linux_amd64.deb
sudo dpkg -i toxiproxy_2.9.0.deb
toxiproxy-server &
- name: Run parser and connection tests
run: cargo test --all-features
- name: Run integration tests
run: cargo test -- --test-threads=1 --ignored
run: cargo test --test integration_tests -- --test-threads=1 --ignored
- name: Run resiliency tests
run: cargo test --test resiliency_tests -- --test-threads=1 --ignored
build:
runs-on: ubuntu-latest
steps:
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased] - ReleaseDate

### Added
- Added `Toxiproxy` resiliency testing.

## [0.4.0] - 2024-09-20

### Added
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ futures = "0.3"
tokio = { version = "1.26", default-features = false, features = ["io-util"] }
async-stream = "0.3"
url = "2.5.2"
toxiproxy_rust = "0.1.6"
fxhash = "0.2.1"

[dev-dependencies]
Expand Down
322 changes: 322 additions & 0 deletions tests/resiliency_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,322 @@
use async_memcached::Client;

use toxiproxy_rust::{
client::Client as ToxiproxyClient,
proxy::{Proxy, ProxyPack},
};

use std::net::{SocketAddr, ToSocketAddrs};
use std::ops::Deref;
use std::sync::{atomic::AtomicUsize, Once, OnceLock};

static TOXIPROXY_INIT: Once = Once::new();
static TOXI_ADDR: OnceLock<SocketAddr> = OnceLock::new();
static PROXY_PORT: AtomicUsize = AtomicUsize::new(40000);

struct ProxyDrop {
proxy: Proxy,
}

impl Deref for ProxyDrop {
type Target = Proxy;

fn deref(&self) -> &Self::Target {
&self.proxy
}
}

impl Drop for ProxyDrop {
fn drop(&mut self) {
self.proxy.delete().unwrap();
}
}

fn create_proxy_and_config() -> (ProxyDrop, String) {
TOXIPROXY_INIT.call_once(|| {
let mut toxiproxy_host = match std::env::var_os("TOXIPROXY_HOST") {
Some(v) => v.into_string().unwrap(),
None => "http://127.0.0.1".to_string(),
};
if let Some(stripped) = toxiproxy_host.strip_prefix("http://") {
toxiproxy_host = stripped.to_string();
}

let toxiproxy_port = match std::env::var_os("TOXIPROXY_PORT") {
Some(v) => v.into_string().unwrap(),
None => "8474".to_string(),
};

let toxiproxy_url = format!("{}:{}", toxiproxy_host, toxiproxy_port);

// Create toxiproxy client and populate proxies
let toxi_addr = toxiproxy_url.to_socket_addrs().unwrap().next().unwrap();

let toxiproxy_client = ToxiproxyClient::new(toxi_addr);
toxiproxy_client
.all()
.unwrap()
.iter()
.for_each(|(_, proxy)| proxy.delete().unwrap());

TOXI_ADDR.get_or_init(|| toxi_addr);
});

let mut local_host = match std::env::var_os("MEMCACHED_HOST") {
Some(v) => v.into_string().unwrap(),
None => "http://127.0.0.1".to_string(), // use IPV4 so that it resolves to a single Server
};
if let Some(stripped) = local_host.strip_prefix("http://") {
local_host = stripped.to_string();
}

let local_memcached_port = match std::env::var_os("MEMCACHED_PORT") {
Some(v) => v.into_string().unwrap(),
None => "11211".to_string(),
};

let local_url = format!("{}:{}", local_host, local_memcached_port);

let local_port = PROXY_PORT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

let toxi_addr = TOXI_ADDR.get().unwrap();
let toxic_local_addr = format!("{}:{}", toxi_addr.ip(), local_port);

let proxy = ProxyPack::new(
format!("local-memcached-{}", local_port),
toxic_local_addr.clone(),
local_url.clone(),
);

let toxiproxy_client = ToxiproxyClient::new(toxi_addr);
assert!(toxiproxy_client.is_running());

let proxy = toxiproxy_client.populate(vec![proxy]).unwrap();
let proxy = proxy
.into_iter()
.map(|proxy| ProxyDrop { proxy })
.next()
.unwrap();

(proxy, toxic_local_addr)
}

async fn setup_clean_client() -> Client {
let mut local_host = match std::env::var_os("MEMCACHED_HOST") {
Some(v) => v.into_string().unwrap(),
None => "http://127.0.0.1".to_string(),
};
if let Some(stripped) = local_host.strip_prefix("http://") {
local_host = stripped.to_string();
}

let local_memcached_port = match std::env::var_os("MEMCACHED_PORT") {
Some(v) => v.into_string().unwrap(),
None => "11211".to_string(),
};

Client::new(format!("tcp://{}:{}", local_host, local_memcached_port,))
.await
.unwrap()
}

async fn setup_toxic_client(toxic_local_url: &String) -> Client {
Client::new(toxic_local_url).await.unwrap()
}

fn setup_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
}

async fn clear_keys(client: &mut Client, keys: &[&str]) {
for key in keys {
let _ = client.delete(key).await;
let result = client.get(key).await;
assert_eq!(result, Ok(None));
}
}

fn setup_runtime_and_clients(
toxic_local_url: &String,
keys: &Vec<&str>,
) -> (tokio::runtime::Runtime, Client, Client) {
let rt = setup_runtime();
let mut clean_client = rt.block_on(setup_clean_client());
let toxic_client = rt.block_on(setup_toxic_client(toxic_local_url));

rt.block_on(clear_keys(&mut clean_client, &keys));

(rt, clean_client, toxic_client)
}

#[cfg(test)]
mod tests {
use super::*;

#[ignore = "Relies on a running memcached server and toxiproxy service"]
#[test]
fn test_set_multi_succeeds_with_clean_client() {
let rt = setup_runtime();

let keys = vec!["clean-key1", "clean-key2", "clean-key3"];
let values = vec!["value1", "value2", "value3"];
let kv: Vec<(&str, &str)> = keys.clone().into_iter().zip(values).collect();

let mut clean_client = rt.block_on(setup_clean_client());

rt.block_on(clear_keys(&mut clean_client, &keys));

let result = rt.block_on(async { clean_client.set_multi(&kv, None, None).await });

assert!(result.is_ok());
}

#[ignore = "Relies on a running memcached server and toxiproxy service"]
#[test]
fn test_set_multi_errors_with_toxic_client_via_with_down() {
let keys = vec!["with-down-key1", "with-down-key2", "with-down-key3"];
let values = vec!["value1", "value2", "value3"];

let (toxic_proxy, toxic_local_addr) = create_proxy_and_config();
let toxic_local_url = "tcp://".to_string() + &toxic_local_addr;

let (rt, _, mut toxic_client) = setup_runtime_and_clients(&toxic_local_url, &keys);

let kv: Vec<(&str, &str)> = keys.clone().into_iter().zip(values).collect();

let _ = toxic_proxy.with_down(|| {
rt.block_on(async {
let result = toxic_client.set_multi(&kv, None, None).await;
assert_eq!(
result,
Err(async_memcached::Error::Io(
std::io::ErrorKind::UnexpectedEof.into()
))
);
});
});
}

#[ignore = "Relies on a running memcached server and toxiproxy service"]
#[test]
fn test_set_multi_errors_on_upstream_with_toxic_client_via_limit_data() {
let keys = vec!["upstream-key1", "upstream-key2", "upstream-key3"];
let values = vec!["value1", "value2", "value3"];

let (toxic_proxy, toxic_local_addr) = create_proxy_and_config();
let toxic_local_url = "tcp://".to_string() + &toxic_local_addr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of boiler plate code here, can we turn it into some helper methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout, I'll clean this up 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a refactoring pass on this. Reconfigured everything so that it's making proper use of environment variables, methods make more sense in the context that they're being use in (e.g. no more unnecessary vecs / proxies[0] since we're only ever making one proxy per test) tests should be more parsable and it should be easier to add more tests in the future.


let (rt, mut clean_client, mut toxic_client) =
setup_runtime_and_clients(&toxic_local_url, &keys);

let multiset_command =
keys.iter()
.zip(values.iter())
.fold(String::new(), |mut acc, (key, value)| {
acc.push_str(&format!("set {} 0 0 {}\r\n{}\r\n", key, value.len(), value));
acc
});

// Simulate a network error happening when the client makes a request to the server. Only part of the request is received by the server.
// In this case, the server can only cache values for the keys with complete commands.

let byte_limit = multiset_command.len() - 10; // First two commands should be intact, last one cut off

let _ = toxic_proxy
.with_limit_data("upstream".into(), byte_limit as u32, 1.0)
.apply(|| {
rt.block_on(async {
let kv: Vec<(&str, &str)> =
keys.clone().into_iter().zip(values.clone()).collect();
let result = toxic_client.set_multi(&kv, None, None).await;

assert_eq!(
result,
Err(async_memcached::Error::Io(
std::io::ErrorKind::UnexpectedEof.into()
))
);
});
});

// Use a clean client to check that the first two keys were stored and last was not
let get_result = rt.block_on(async { clean_client.get("upstream-key1").await });
assert!(matches!(
std::str::from_utf8(
&get_result
.expect("should have unwrapped a Result")
.expect("should have unwrapped an Option")
.data
)
.expect("failed to parse string from bytes"),
"value1"
));

let get_result = rt.block_on(async { clean_client.get("upstream-key2").await });
assert!(matches!(
std::str::from_utf8(
&get_result
.expect("should have unwrapped a Result")
.expect("should have unwrapped an Option")
.data
)
.expect("failed to parse string from bytes"),
"value2"
));

let get_result = rt.block_on(async { clean_client.get("upstream-key3").await });
assert_eq!(get_result, Ok(None));
}

#[ignore = "Relies on a running memcached server and toxiproxy service"]
#[test]
fn test_set_multi_errors_on_downstream_with_toxic_client_via_limit_data() {
let keys = vec!["downstream-key1", "downstream-key2", "downstream-key3"];
let values = vec!["value1", "value2", "value3"];

let (toxic_proxy, toxic_local_addr) = create_proxy_and_config();
let toxic_local_url = "tcp://".to_string() + &toxic_local_addr;

let (rt, mut clean_client, mut toxic_client) =
setup_runtime_and_clients(&toxic_local_url, &keys);

// Simulate a network error happening when the server responds back to the client. A complete response is received for the first key but then
// the connection is closed before the other responses are received. Regardless, the server should still cache all the data.
let byte_limit = "STORED\r\n".as_bytes().len() + 1;

let _ = toxic_proxy
.with_limit_data("downstream".into(), byte_limit as u32, 1.0)
.apply(|| {
rt.block_on(async {
let kv: Vec<(&str, &str)> =
keys.clone().into_iter().zip(values.clone()).collect();

let set_result = toxic_client.set_multi(&kv, None, None).await;

assert_eq!(
set_result,
Err(async_memcached::Error::Io(
std::io::ErrorKind::UnexpectedEof.into()
))
);
});
});

// Use a clean client to check that all values were cached by the server despite the interrupted server response.
for (key, _expected_value) in keys.iter().zip(values.iter()) {
let get_result = rt.block_on(async { clean_client.get(key).await });
assert!(matches!(
std::str::from_utf8(
&get_result
.expect("should have unwrapped a Result")
.expect("should have unwrapped an Option")
.data
)
.expect("failed to parse string from bytes"),
_expected_value
));
}
}
}
Loading