From e2144e995627339efd7772901352684fb8a4abbf Mon Sep 17 00:00:00 2001 From: ogzhanolguncu Date: Sun, 17 Sep 2023 11:47:12 +0300 Subject: [PATCH] Add store to save subscribers --- src/main.rs | 1 + src/store/mod.rs | 120 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+) create mode 100644 src/store/mod.rs diff --git a/src/main.rs b/src/main.rs index 10ee956..64ef9e6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ extern crate log; mod handle_stream; mod nats; +mod store; use handle_stream::handle_stream; use std::net::{Ipv4Addr, SocketAddrV4, TcpListener}; diff --git a/src/store/mod.rs b/src/store/mod.rs new file mode 100644 index 0000000..1170ec4 --- /dev/null +++ b/src/store/mod.rs @@ -0,0 +1,120 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, RwLock}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Subject(String); + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct SubscriptionId(String); + +#[derive(Debug, Clone)] +pub struct MessageBrokerStore { + data: Arc>>>, +} + +impl MessageBrokerStore { + pub fn new() -> Self { + MessageBrokerStore { + data: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub fn add_subscription( + &self, + subject: Subject, + sub: SubscriptionId, + ) -> Result { + self.data + .write() + .map_err(|_| "Could not acquire data write lock") + .and_then(|mut data| Ok(data.entry(subject).or_insert_with(HashSet::new).insert(sub))) + } + + pub fn remove_subscription( + &self, + subject: &Subject, + sub: &SubscriptionId, + ) -> Result { + self.data + .write() + .map_err(|_| "Could not acquire data write lock") + .and_then(|mut data| { + data.get_mut(subject) + .ok_or("Could not get subject") + .map(|subs| subs.remove(sub)) + }) + } + + pub fn list_subscriptions( + &self, + subject: &Subject, + ) -> Result, &'static str> { + match self.data.read() { + Ok(data) => Ok(data + .get(subject) + .cloned() + .map(|hs| hs.into_iter().collect()) + .unwrap_or_else(Vec::new)), + Err(_) => Err("Could not acquire data read lock"), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn should_add_subscriber_to_cache() { + let cache = MessageBrokerStore::new(); + cache + .add_subscription(Subject("FOO".to_string()), SubscriptionId("10".to_string())) + .unwrap(); + cache + .add_subscription(Subject("FOO".to_string()), SubscriptionId("10".to_string())) + .unwrap(); + cache + .add_subscription(Subject("FOO".to_string()), SubscriptionId("10".to_string())) + .unwrap(); + + assert_eq!( + 1, + cache + .list_subscriptions(&Subject("FOO".to_string())) + .unwrap() + .len() + ) + } + + #[test] + fn should_remove_subscriber_from_cache() { + let cache = MessageBrokerStore::new(); + cache + .add_subscription(Subject("FOO".to_string()), SubscriptionId("10".to_string())) + .unwrap(); + cache + .add_subscription(Subject("FOO".to_string()), SubscriptionId("11".to_string())) + .unwrap(); + cache + .remove_subscription( + &Subject("FOO".to_string()), + &SubscriptionId("10".to_string()), + ) + .unwrap(); + + assert_eq!( + 1, + cache + .list_subscriptions(&Subject("FOO".to_string())) + .unwrap() + .len() + ) + } +} + +// let mut map: HashMap<&str, String> = HashMap::new(); +// let s = "hoho".to_string(); + +// map.entry("poneyland").or_insert_with(|| s); + +// assert_eq!(map["poneyland"], "hoho".to_string());