Skip to content

Commit

Permalink
Add store to save subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
ogzhanolguncu committed Sep 17, 2023
1 parent 1b4a6f7 commit e2144e9
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ extern crate log;

mod handle_stream;
mod nats;
mod store;

use handle_stream::handle_stream;
use std::net::{Ipv4Addr, SocketAddrV4, TcpListener};
Expand Down
120 changes: 120 additions & 0 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Subject(String);

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SubscriptionId(String);

#[derive(Debug, Clone)]
pub struct MessageBrokerStore {
data: Arc<RwLock<HashMap<Subject, HashSet<SubscriptionId>>>>,

Check failure on line 12 in src/store/mod.rs

View workflow job for this annotation

GitHub Actions / Clippy

field `data` is never read

Check warning on line 12 in src/store/mod.rs

View workflow job for this annotation

GitHub Actions / Check

field `data` is never read
}

impl MessageBrokerStore {
pub fn new() -> Self {

Check failure on line 16 in src/store/mod.rs

View workflow job for this annotation

GitHub Actions / Clippy

associated items `new`, `add_subscription`, `remove_subscription`, and `list_subscriptions` are never used

Check warning on line 16 in src/store/mod.rs

View workflow job for this annotation

GitHub Actions / Check

associated items `new`, `add_subscription`, `remove_subscription`, and `list_subscriptions` are never used
MessageBrokerStore {
data: Arc::new(RwLock::new(HashMap::new())),
}
}

pub fn add_subscription(
&self,
subject: Subject,
sub: SubscriptionId,
) -> Result<bool, &'static str> {
self.data

Check failure on line 27 in src/store/mod.rs

View workflow job for this annotation

GitHub Actions / Clippy

using `Result.and_then(|x| Ok(y))`, which is more succinctly expressed as `map(|x| y)`
.write()
.map_err(|_| "Could not acquire data write lock")
.and_then(|mut data| Ok(data.entry(subject).or_insert_with(HashSet::new).insert(sub)))
}

pub fn remove_subscription(
&self,
subject: &Subject,
sub: &SubscriptionId,
) -> Result<bool, &'static str> {
self.data
.write()
.map_err(|_| "Could not acquire data write lock")
.and_then(|mut data| {
data.get_mut(subject)
.ok_or("Could not get subject")
.map(|subs| subs.remove(sub))
})
}

pub fn list_subscriptions(
&self,
subject: &Subject,
) -> Result<Vec<SubscriptionId>, &'static str> {
match self.data.read() {
Ok(data) => Ok(data
.get(subject)
.cloned()
.map(|hs| hs.into_iter().collect())
.unwrap_or_else(Vec::new)),
Err(_) => Err("Could not acquire data read lock"),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn should_add_subscriber_to_cache() {
let cache = MessageBrokerStore::new();
cache
.add_subscription(Subject("FOO".to_string()), SubscriptionId("10".to_string()))
.unwrap();
cache
.add_subscription(Subject("FOO".to_string()), SubscriptionId("10".to_string()))
.unwrap();
cache
.add_subscription(Subject("FOO".to_string()), SubscriptionId("10".to_string()))
.unwrap();

assert_eq!(
1,
cache
.list_subscriptions(&Subject("FOO".to_string()))
.unwrap()
.len()
)
}

#[test]
fn should_remove_subscriber_from_cache() {
let cache = MessageBrokerStore::new();
cache
.add_subscription(Subject("FOO".to_string()), SubscriptionId("10".to_string()))
.unwrap();
cache
.add_subscription(Subject("FOO".to_string()), SubscriptionId("11".to_string()))
.unwrap();
cache
.remove_subscription(
&Subject("FOO".to_string()),
&SubscriptionId("10".to_string()),
)
.unwrap();

assert_eq!(
1,
cache
.list_subscriptions(&Subject("FOO".to_string()))
.unwrap()
.len()
)
}
}

// let mut map: HashMap<&str, String> = HashMap::new();
// let s = "hoho".to_string();

// map.entry("poneyland").or_insert_with(|| s);

// assert_eq!(map["poneyland"], "hoho".to_string());

0 comments on commit e2144e9

Please sign in to comment.