Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kv::watch_many functions #1364

Merged
merged 5 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 121 additions & 1 deletion async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,42 @@ impl Store {
.await
}

/// Creates a [futures::Stream] over [Entries][Entry] a given key in the bucket, starting from
/// Creates a [futures::Stream] over [Entries][Entry] in the bucket, which yields
/// values whenever there are changes for given keys.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::StreamExt;
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let kv = jetstream
/// .create_key_value(async_nats::jetstream::kv::Config {
/// bucket: "kv".to_string(),
/// history: 10,
/// ..Default::default()
/// })
/// .await?;
/// let mut entries = kv.watch_many(["foo", "bar"]).await?;
/// while let Some(entry) = entries.next().await {
/// println!("entry: {:?}", entry);
/// }
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "server_2_10")]
pub async fn watch_many<T, K>(&self, keys: K) -> Result<Watch, WatchError>
where
T: AsRef<str>,
K: IntoIterator<Item = T>,
{
self.watch_many_with_deliver_policy(keys, DeliverPolicy::New)
.await
}

/// Creates a [futures::Stream] over [Entries][Entry] for a given key in the bucket, starting from
/// provided revision. This is useful to resume watching over big KV buckets without a need to
/// replay all the history.
///
Expand Down Expand Up @@ -577,6 +612,91 @@ impl Store {
.await
}

/// Creates a [futures::Stream] over [Entries][Entry] a given keys in the bucket, which yields
/// values whenever there are changes for those keys with as well as last value.
/// This requires server version > 2.10 as it uses consumers with multiple subject filters.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::StreamExt;
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let kv = jetstream
/// .create_key_value(async_nats::jetstream::kv::Config {
/// bucket: "kv".to_string(),
/// history: 10,
/// ..Default::default()
/// })
/// .await?;
/// let mut entries = kv.watch_many_with_history(["key1", "key2"]).await?;
/// while let Some(entry) = entries.next().await {
/// println!("entry: {:?}", entry);
/// }
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "server_2_10")]
pub async fn watch_many_with_history<T: AsRef<str>, K: IntoIterator<Item = T>>(
&self,
keys: K,
) -> Result<Watch, WatchError> {
self.watch_many_with_deliver_policy(keys, DeliverPolicy::LastPerSubject)
.await
}

#[cfg(feature = "server_2_10")]
async fn watch_many_with_deliver_policy<T: AsRef<str>, K: IntoIterator<Item = T>>(
&self,
keys: K,
deliver_policy: DeliverPolicy,
) -> Result<Watch, WatchError> {
let subjects = keys
.into_iter()
.map(|key| {
let key = key.as_ref();
format!("{}{}", self.prefix.as_str(), key)
})
.collect::<Vec<_>>();

debug!("initial consumer creation");
let consumer = self
.stream
.create_consumer(super::consumer::push::OrderedConfig {
deliver_subject: self.stream.context.client.new_inbox(),
description: Some("kv watch consumer".to_string()),
filter_subjects: subjects,
replay_policy: super::consumer::ReplayPolicy::Instant,
deliver_policy,
..Default::default()
})
.await
.map_err(|err| match err.kind() {
crate::jetstream::stream::ConsumerErrorKind::TimedOut => {
WatchError::new(WatchErrorKind::TimedOut)
}
_ => WatchError::with_source(WatchErrorKind::Other, err),
})?;

Ok(Watch {
no_messages: deliver_policy != DeliverPolicy::New
&& consumer.cached_info().num_pending == 0,
subscription: consumer.messages().await.map_err(|err| match err.kind() {
crate::jetstream::consumer::StreamErrorKind::TimedOut => {
WatchError::new(WatchErrorKind::TimedOut)
}
crate::jetstream::consumer::StreamErrorKind::Other => {
WatchError::with_source(WatchErrorKind::Other, err)
}
})?,
prefix: self.prefix.clone(),
bucket: self.name.clone(),
seen_current: false,
})
}

async fn watch_with_deliver_policy<T: AsRef<str>>(
&self,
key: T,
Expand Down
165 changes: 163 additions & 2 deletions async-nats/tests/kv_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// limitations under the License.

mod kv {
use std::{str::from_utf8, time::Duration};
use std::{collections::HashMap, str::from_utf8, time::Duration};

use async_nats::{
jetstream::{
Expand Down Expand Up @@ -559,7 +559,6 @@ mod kv {
async fn watch() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|event| async move { println!("event: {event:?}") })
.connect(server.client_url())
.await
.unwrap();
Expand Down Expand Up @@ -613,6 +612,81 @@ mod kv {
}
}
}
#[tokio::test]
#[cfg(feature = "server_2_10")]
async fn watch_many() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.connect(server.client_url())
.await
.unwrap();

let context = async_nats::jetstream::new(client);

let kv = context
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "history".into(),
description: "test_description".into(),
history: 15,
storage: StorageType::File,
num_replicas: 1,
..Default::default()
})
.await
.unwrap();

// check if we get only updated values. This should not pop up in watcher.
kv.put("foo", 22.to_string().into()).await.unwrap();
kv.put("bar", 22.to_string().into()).await.unwrap();
let mut watch = kv.watch_many(["foo.>", "bar.>"]).await.unwrap();

tokio::task::spawn({
let kv = kv.clone();
async move {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(50)).await;
kv.put(format!("foo.{i}"), i.to_string().into())
.await
.unwrap();
}
}
});
tokio::task::spawn({
let kv = kv.clone();
async move {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(50)).await;
kv.put(format!("bar.{i}"), i.to_string().into())
.await
.unwrap();
}
}
});

tokio::task::spawn({
let kv = kv.clone();
async move {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(50)).await;
kv.put("var", i.to_string().into()).await.unwrap();
}
}
});

let mut keys = HashMap::new();
for i in 0..10 {
keys.insert(format!("foo.{i}"), ());
keys.insert(format!("bar.{i}"), ());
}
while let Some(entry) = watch.next().await {
let entry = entry.unwrap();
assert!(keys.contains_key(&entry.key));
keys.remove(&entry.key);
if keys.is_empty() {
break;
}
}
}

#[tokio::test]
async fn watch_seen_current() {
Expand Down Expand Up @@ -752,6 +826,93 @@ mod kv {
}
}

#[tokio::test]
#[cfg(feature = "server_2_10")]
async fn watch_many_with_history() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|event| async move { println!("event: {event:?}") })
.connect(server.client_url())
.await
.unwrap();

let context = async_nats::jetstream::new(client);

let kv = context
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "history".into(),
description: "test_description".into(),
history: 15,
storage: StorageType::File,
num_replicas: 1,
..Default::default()
})
.await
.unwrap();

// check if we get updated values. This should not pop up in watcher.
kv.put("foo.bar", 42.to_string().into()).await.unwrap();
let mut watch = kv
.watch_many_with_history(["foo.>", "bar.>"])
.await
.unwrap();

tokio::task::spawn({
let kv = kv.clone();
async move {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(50)).await;
kv.put(format!("foo.{i}"), i.to_string().into())
.await
.unwrap();
}
}
});

tokio::task::spawn({
let kv = kv.clone();
async move {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(50)).await;
kv.put("var", i.to_string().into()).await.unwrap();
}
}
});

tokio::task::spawn({
let kv = kv.clone();
async move {
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(50)).await;
kv.put(format!("bar.{i}"), i.to_string().into())
.await
.unwrap();
}
}
});

let entry = watch.next().await.unwrap();
let entry = entry.unwrap();
assert_eq!("foo.bar", entry.key);

let mut keys = HashMap::new();
for i in 0..10 {
keys.insert(format!("foo.{i}"), ());
keys.insert(format!("bar.{i}"), ());
}

// make sure we get the rest correctly
while let Some(entry) = watch.next().await {
let entry = entry.unwrap();
// we now start at 1, we've done one iteration
assert!(keys.contains_key(&entry.key));
keys.remove(&entry.key);
if keys.is_empty() {
break;
}
}
}

#[tokio::test]
async fn watch_all() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down
Loading