From d9ee7b1b8bcdbb849419ca9d4744b3cb06ddc81e Mon Sep 17 00:00:00 2001 From: Graham King Date: Wed, 8 Oct 2025 17:17:45 -0400 Subject: [PATCH 1/3] feat: Introduce storage_client in DistributedRuntime This will gradually replace `etcd_client`. It is backed by etcd normally, or memory when etcd is not available. Also use it to list instances, as an example and first usage. This is the same interface that ModelDeploymentCards have always used for publishing and reading, so it is well tested. Signed-off-by: Graham King --- lib/llm/src/http/service/health.rs | 14 ++++------- lib/llm/src/http/service/service_v2.rs | 19 +++++++++++--- lib/runtime/src/component.rs | 23 +++++++---------- lib/runtime/src/distributed.rs | 18 ++++++++++--- lib/runtime/src/instances.rs | 25 +++++++++---------- lib/runtime/src/lib.rs | 5 +++- lib/runtime/src/storage/key_value_store.rs | 2 ++ .../src/storage/key_value_store/etcd.rs | 6 +++++ .../src/storage/key_value_store/mem.rs | 7 ++++++ .../src/storage/key_value_store/nats.rs | 4 +++ 10 files changed, 79 insertions(+), 44 deletions(-) diff --git a/lib/llm/src/http/service/health.rs b/lib/llm/src/http/service/health.rs index 6be55254ad..0409aaef32 100644 --- a/lib/llm/src/http/service/health.rs +++ b/lib/llm/src/http/service/health.rs @@ -52,16 +52,12 @@ async fn live_handler( async fn health_handler( axum::extract::State(state): axum::extract::State>, ) -> impl IntoResponse { - let instances = if let Some(etcd_client) = state.etcd_client() { - match list_all_instances(etcd_client).await { - Ok(instances) => instances, - Err(err) => { - tracing::warn!("Failed to fetch instances from etcd: {}", err); - vec![] - } + let instances = match list_all_instances(state.storage_client()).await { + Ok(instances) => instances, + Err(err) => { + tracing::warn!(%err, "Failed to fetch instances from storage"); + vec![] } - } else { - vec![] }; let mut endpoints: Vec = instances diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index 67df8a5074..f316ddd0cf 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -19,6 +19,9 @@ use anyhow::Result; use axum_server::tls_rustls::RustlsConfig; use derive_builder::Builder; use dynamo_runtime::logging::make_request_span; +use dynamo_runtime::storage::key_value_store::EtcdStorage; +use dynamo_runtime::storage::key_value_store::KeyValueStore; +use dynamo_runtime::storage::key_value_store::MemoryStorage; use dynamo_runtime::transports::etcd; use std::net::SocketAddr; use tokio::task::JoinHandle; @@ -26,11 +29,11 @@ use tokio_util::sync::CancellationToken; use tower_http::trace::TraceLayer; /// HTTP service shared state -#[derive(Default)] pub struct State { metrics: Arc, manager: Arc, etcd_client: Option, + storage_client: Arc, flags: StateFlags, } @@ -76,6 +79,7 @@ impl State { manager, metrics: Arc::new(Metrics::default()), etcd_client: None, + storage_client: Arc::new(MemoryStorage::new()), flags: StateFlags { chat_endpoints_enabled: AtomicBool::new(false), cmpl_endpoints_enabled: AtomicBool::new(false), @@ -85,11 +89,12 @@ impl State { } } - pub fn new_with_etcd(manager: Arc, etcd_client: Option) -> Self { + pub fn new_with_etcd(manager: Arc, etcd_client: etcd::Client) -> Self { Self { manager, metrics: Arc::new(Metrics::default()), - etcd_client, + storage_client: Arc::new(EtcdStorage::new(etcd_client.clone())), + etcd_client: Some(etcd_client), flags: StateFlags { chat_endpoints_enabled: AtomicBool::new(false), cmpl_endpoints_enabled: AtomicBool::new(false), @@ -115,6 +120,10 @@ impl State { self.etcd_client.as_ref() } + pub fn storage_client(&self) -> Arc { + self.storage_client.clone() + } + // TODO pub fn sse_keep_alive(&self) -> Option { None @@ -294,7 +303,9 @@ impl HttpServiceConfigBuilder { let config: HttpServiceConfig = self.build_internal()?; let model_manager = Arc::new(ModelManager::new()); - let etcd_client = config.etcd_client; + let Some(etcd_client) = config.etcd_client else { + anyhow::bail!("Missing etcd_client in config, building HttpServiceConfig"); + }; let state = Arc::new(State::new_with_etcd(model_manager, etcd_client)); state diff --git a/lib/runtime/src/component.rs b/lib/runtime/src/component.rs index 9b38c1c12c..eac6219951 100644 --- a/lib/runtime/src/component.rs +++ b/lib/runtime/src/component.rs @@ -240,27 +240,22 @@ impl Component { } pub async fn list_instances(&self) -> anyhow::Result> { - let Some(etcd_client) = self.drt.etcd_client() else { + let client = self.drt.storage_client(); + let Some(bucket) = client.get_bucket(&self.etcd_root()).await? else { return Ok(vec![]); }; - let mut out = vec![]; - // The extra slash is important to only list exact component matches, not substrings. - for kv in etcd_client - .kv_get_prefix(format!("{}/", self.etcd_root())) - .await? - { - let val = match serde_json::from_slice::(kv.value()) { + let entries = bucket.entries().await?; + let mut instances = Vec::with_capacity(entries.len()); + for (name, bytes) in entries.into_iter() { + let val = match serde_json::from_slice::(&bytes) { Ok(val) => val, Err(err) => { - anyhow::bail!( - "Error converting etcd response to Instance: {err}. {}", - kv.value_str()? - ); + anyhow::bail!("Error converting storage response to Instance: {err}. {name}",); } }; - out.push(val); + instances.push(val); } - Ok(out) + Ok(instances) } /// Scrape ServiceSet, which contains NATS stats as well as user defined stats diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 85b0964f35..dec6bded0d 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 pub use crate::component::Component; +use crate::storage::key_value_store::{EtcdStorage, KeyValueStore, MemoryStorage}; use crate::transports::nats::DRTNatsClientPrometheusMetrics; use crate::{ ErrorContext, RuntimeCallback, @@ -44,10 +45,15 @@ impl DistributedRuntime { let runtime_clone = runtime.clone(); - let etcd_client = if is_static { - None + let (etcd_client, storage_client) = if is_static { + let storage_client: Arc = Arc::new(MemoryStorage::new()); + (None, storage_client) } else { - Some(etcd::Client::new(etcd_config.clone(), runtime_clone).await?) + let etcd_client = etcd::Client::new(etcd_config.clone(), runtime_clone).await?; + let storage_client: Arc = + Arc::new(EtcdStorage::new(etcd_client.clone())); + + (Some(etcd_client), storage_client) }; let nats_client = nats_config.clone().connect().await?; @@ -77,6 +83,7 @@ impl DistributedRuntime { let distributed_runtime = Self { runtime, etcd_client, + storage_client, nats_client, tcp_server: Arc::new(OnceCell::new()), system_status_server: Arc::new(OnceLock::new()), @@ -270,6 +277,11 @@ impl DistributedRuntime { self.etcd_client.clone() } + /// An interface to store things. Will eventually replace `etcd_client`. + pub fn storage_client(&self) -> Arc { + self.storage_client.clone() + } + pub fn child_token(&self) -> CancellationToken { self.runtime.child_token() } diff --git a/lib/runtime/src/instances.rs b/lib/runtime/src/instances.rs index 0cc88b3a0c..3c0a1942d6 100644 --- a/lib/runtime/src/instances.rs +++ b/lib/runtime/src/instances.rs @@ -7,25 +7,24 @@ //! the entire distributed system, complementing the component-specific //! instance listing in `component.rs`. +use std::sync::Arc; + use crate::component::{INSTANCE_ROOT_PATH, Instance}; +use crate::storage::key_value_store::KeyValueStore; use crate::transports::etcd::Client as EtcdClient; -pub async fn list_all_instances(etcd_client: &EtcdClient) -> anyhow::Result> { - let mut instances = Vec::new(); +pub async fn list_all_instances(client: Arc) -> anyhow::Result> { + let Some(bucket) = client.get_bucket(INSTANCE_ROOT_PATH).await? else { + return Ok(vec![]); + }; - for kv in etcd_client - .kv_get_prefix(format!("{}/", INSTANCE_ROOT_PATH)) - .await? - { - match serde_json::from_slice::(kv.value()) { + let entries = bucket.entries().await?; + let mut instances = Vec::with_capacity(entries.len()); + for (name, bytes) in entries.into_iter() { + match serde_json::from_slice::(&bytes) { Ok(instance) => instances.push(instance), Err(err) => { - tracing::warn!( - "Failed to parse instance from etcd: {}. Key: {}, Value: {}", - err, - kv.key_str().unwrap_or("invalid_key"), - kv.value_str().unwrap_or("invalid_value") - ); + tracing::warn!(%err, key = name, "Failed to parse instance from storage"); } } } diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 7162ff2751..1e0f1346db 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -51,7 +51,9 @@ pub use system_health::{HealthCheckTarget, SystemHealth}; pub use tokio_util::sync::CancellationToken; pub use worker::Worker; -use crate::metrics::prometheus_names::distributed_runtime; +use crate::{ + metrics::prometheus_names::distributed_runtime, storage::key_value_store::KeyValueStore, +}; use component::{Endpoint, InstanceSource}; use utils::GracefulShutdownTracker; @@ -152,6 +154,7 @@ pub struct DistributedRuntime { // we might consider a unifed transport manager here etcd_client: Option, nats_client: transports::nats::Client, + storage_client: Arc, tcp_server: Arc>>, system_status_server: Arc>>, diff --git a/lib/runtime/src/storage/key_value_store.rs b/lib/runtime/src/storage/key_value_store.rs index 3cbd4dbf77..0f4296637f 100644 --- a/lib/runtime/src/storage/key_value_store.rs +++ b/lib/runtime/src/storage/key_value_store.rs @@ -75,6 +75,8 @@ pub trait KeyValueStore: Send + Sync { &self, bucket_name: &str, ) -> Result>, StorageError>; + + fn connection_id(&self) -> u64; } pub struct KeyValueStoreManager(Box); diff --git a/lib/runtime/src/storage/key_value_store/etcd.rs b/lib/runtime/src/storage/key_value_store/etcd.rs index 5271d855d1..5967dc888e 100644 --- a/lib/runtime/src/storage/key_value_store/etcd.rs +++ b/lib/runtime/src/storage/key_value_store/etcd.rs @@ -45,6 +45,12 @@ impl KeyValueStore for EtcdStorage { bucket_name: bucket_name.to_string(), }))) } + + fn connection_id(&self) -> u64 { + // This conversion from i64 to u64 is safe because etcd lease IDs are u64 internally. + // They present as i64 because of the limitations of the etcd grpc/HTTP JSON API. + self.client.lease_id() as u64 + } } pub struct EtcdBucket { diff --git a/lib/runtime/src/storage/key_value_store/mem.rs b/lib/runtime/src/storage/key_value_store/mem.rs index ea93360740..ac1b388fa8 100644 --- a/lib/runtime/src/storage/key_value_store/mem.rs +++ b/lib/runtime/src/storage/key_value_store/mem.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use rand::Rng as _; use tokio::sync::Mutex; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; @@ -18,6 +19,7 @@ use super::{KeyValueBucket, KeyValueStore, StorageError, StorageOutcome}; #[derive(Clone)] pub struct MemoryStorage { inner: Arc, + connection_id: u64, } impl Default for MemoryStorage { @@ -58,6 +60,7 @@ impl MemoryStorage { change_sender: tx, change_receiver: Mutex::new(rx), }), + connection_id: rand::rng().random(), } } } @@ -96,6 +99,10 @@ impl KeyValueStore for MemoryStorage { None => Ok(None), } } + + fn connection_id(&self) -> u64 { + self.connection_id + } } #[async_trait] diff --git a/lib/runtime/src/storage/key_value_store/nats.rs b/lib/runtime/src/storage/key_value_store/nats.rs index d88c53d205..7ebe4da54d 100644 --- a/lib/runtime/src/storage/key_value_store/nats.rs +++ b/lib/runtime/src/storage/key_value_store/nats.rs @@ -45,6 +45,10 @@ impl KeyValueStore for NATSStorage { None => Ok(None), } } + + fn connection_id(&self) -> u64 { + self.client.client().server_info().client_id + } } impl NATSStorage { From c882f29f1ee84533281f3fddc0e113307ac95b02 Mon Sep 17 00:00:00 2001 From: Graham King Date: Wed, 8 Oct 2025 17:50:37 -0400 Subject: [PATCH 2/3] Thanks Code Rabbit Signed-off-by: Graham King --- lib/llm/src/grpc/service/kserve.rs | 9 +++++--- lib/llm/src/http/service/service_v2.rs | 7 +++---- lib/runtime/src/component.rs | 29 +++++++++++++++++++++++++- lib/runtime/src/instances.rs | 1 + 4 files changed, 38 insertions(+), 8 deletions(-) diff --git a/lib/llm/src/grpc/service/kserve.rs b/lib/llm/src/grpc/service/kserve.rs index 68def72825..6c8b158729 100644 --- a/lib/llm/src/grpc/service/kserve.rs +++ b/lib/llm/src/grpc/service/kserve.rs @@ -57,11 +57,11 @@ impl State { } } - pub fn new_with_etcd(manager: Arc, etcd_client: Option) -> Self { + pub fn new_with_etcd(manager: Arc, etcd_client: etcd::Client) -> Self { Self { manager, metrics: Arc::new(Metrics::default()), - etcd_client, + etcd_client: Some(etcd_client), } } @@ -155,7 +155,10 @@ impl KserveServiceConfigBuilder { let config: KserveServiceConfig = self.build_internal()?; let model_manager = Arc::new(ModelManager::new()); - let state = Arc::new(State::new_with_etcd(model_manager, config.etcd_client)); + let state = match config.etcd_client { + Some(etcd_client) => Arc::new(State::new_with_etcd(model_manager, etcd_client)), + None => Arc::new(State::new(model_manager)), + }; // enable prometheus metrics let registry = metrics::Registry::new(); diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index f316ddd0cf..f8cd2ebc27 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -303,11 +303,10 @@ impl HttpServiceConfigBuilder { let config: HttpServiceConfig = self.build_internal()?; let model_manager = Arc::new(ModelManager::new()); - let Some(etcd_client) = config.etcd_client else { - anyhow::bail!("Missing etcd_client in config, building HttpServiceConfig"); + let state = match config.etcd_client { + Some(etcd_client) => Arc::new(State::new_with_etcd(model_manager, etcd_client)), + None => Arc::new(State::new(model_manager)), }; - let state = Arc::new(State::new_with_etcd(model_manager, etcd_client)); - state .flags .set(&EndpointType::Chat, config.enable_chat_endpoints); diff --git a/lib/runtime/src/component.rs b/lib/runtime/src/component.rs index eac6219951..31e5cadc98 100644 --- a/lib/runtime/src/component.rs +++ b/lib/runtime/src/component.rs @@ -29,6 +29,8 @@ //! //! TODO: Top-level Overview of Endpoints/Functions +use std::fmt; + use crate::{ config::HealthStatus, discovery::Lease, @@ -91,7 +93,7 @@ pub struct Registry { inner: Arc>, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct Instance { pub component: String, pub endpoint: String, @@ -113,6 +115,30 @@ impl Instance { } } +impl fmt::Display for Instance { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}/{}/{}/{}", + self.namespace, self.component, self.endpoint, self.instance_id + ) + } +} + +/// Sort by string name +impl std::cmp::Ord for Instance { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.to_string().cmp(&other.to_string()) + } +} + +impl PartialOrd for Instance { + fn partial_cmp(&self, other: &Self) -> Option { + // Since Ord is fully implemented, the comparison is always total. + Some(self.cmp(other)) + } +} + /// A [Component] a discoverable entity in the distributed runtime. /// You can host [Endpoint] on a [Component] by first creating /// a [Service] then adding one or more [Endpoint] to the [Service]. @@ -255,6 +281,7 @@ impl Component { }; instances.push(val); } + instances.sort(); Ok(instances) } diff --git a/lib/runtime/src/instances.rs b/lib/runtime/src/instances.rs index 3c0a1942d6..f033e1804b 100644 --- a/lib/runtime/src/instances.rs +++ b/lib/runtime/src/instances.rs @@ -28,6 +28,7 @@ pub async fn list_all_instances(client: Arc) -> anyhow::Resul } } } + instances.sort(); Ok(instances) } From e2c90e6e5b7a3e2ed9ace706b138b14eda6623f8 Mon Sep 17 00:00:00 2001 From: Graham King Date: Fri, 10 Oct 2025 10:01:00 -0400 Subject: [PATCH 3/3] Rename things to progress code review `*Storage` -> `*Store` `storage_client` -> `store` `etcd_root` -> `instance_root` Add a comment explaining that although `store` is `dyn KeyValueStore` right now, it has ambitions to be the one-ring of Dynamo storage. Signed-off-by: Graham King --- lib/llm/src/http/service/health.rs | 4 +- lib/llm/src/http/service/service_v2.rs | 14 ++-- lib/llm/src/local_model.rs | 4 +- lib/llm/src/model_card.rs | 4 +- lib/runtime/src/component.rs | 12 ++-- lib/runtime/src/distributed.rs | 20 +++--- lib/runtime/src/lib.rs | 2 +- lib/runtime/src/storage/key_value_store.rs | 62 ++++++++--------- .../src/storage/key_value_store/etcd.rs | 62 ++++++++--------- .../src/storage/key_value_store/mem.rs | 50 +++++++------- .../src/storage/key_value_store/nats.rs | 66 +++++++++---------- 11 files changed, 150 insertions(+), 150 deletions(-) diff --git a/lib/llm/src/http/service/health.rs b/lib/llm/src/http/service/health.rs index 0409aaef32..5f007a9bd4 100644 --- a/lib/llm/src/http/service/health.rs +++ b/lib/llm/src/http/service/health.rs @@ -52,10 +52,10 @@ async fn live_handler( async fn health_handler( axum::extract::State(state): axum::extract::State>, ) -> impl IntoResponse { - let instances = match list_all_instances(state.storage_client()).await { + let instances = match list_all_instances(state.store()).await { Ok(instances) => instances, Err(err) => { - tracing::warn!(%err, "Failed to fetch instances from storage"); + tracing::warn!(%err, "Failed to fetch instances from store"); vec![] } }; diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index f8cd2ebc27..00e4439229 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -19,9 +19,9 @@ use anyhow::Result; use axum_server::tls_rustls::RustlsConfig; use derive_builder::Builder; use dynamo_runtime::logging::make_request_span; -use dynamo_runtime::storage::key_value_store::EtcdStorage; +use dynamo_runtime::storage::key_value_store::EtcdStore; use dynamo_runtime::storage::key_value_store::KeyValueStore; -use dynamo_runtime::storage::key_value_store::MemoryStorage; +use dynamo_runtime::storage::key_value_store::MemoryStore; use dynamo_runtime::transports::etcd; use std::net::SocketAddr; use tokio::task::JoinHandle; @@ -33,7 +33,7 @@ pub struct State { metrics: Arc, manager: Arc, etcd_client: Option, - storage_client: Arc, + store: Arc, flags: StateFlags, } @@ -79,7 +79,7 @@ impl State { manager, metrics: Arc::new(Metrics::default()), etcd_client: None, - storage_client: Arc::new(MemoryStorage::new()), + store: Arc::new(MemoryStore::new()), flags: StateFlags { chat_endpoints_enabled: AtomicBool::new(false), cmpl_endpoints_enabled: AtomicBool::new(false), @@ -93,7 +93,7 @@ impl State { Self { manager, metrics: Arc::new(Metrics::default()), - storage_client: Arc::new(EtcdStorage::new(etcd_client.clone())), + store: Arc::new(EtcdStore::new(etcd_client.clone())), etcd_client: Some(etcd_client), flags: StateFlags { chat_endpoints_enabled: AtomicBool::new(false), @@ -120,8 +120,8 @@ impl State { self.etcd_client.as_ref() } - pub fn storage_client(&self) -> Arc { - self.storage_client.clone() + pub fn store(&self) -> Arc { + self.store.clone() } // TODO diff --git a/lib/llm/src/local_model.rs b/lib/llm/src/local_model.rs index 87fcfaf7e5..09721b24e8 100644 --- a/lib/llm/src/local_model.rs +++ b/lib/llm/src/local_model.rs @@ -12,7 +12,7 @@ use dynamo_runtime::storage::key_value_store::Key; use dynamo_runtime::traits::DistributedRuntimeProvider; use dynamo_runtime::{ component::Endpoint, - storage::key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager}, + storage::key_value_store::{EtcdStore, KeyValueStore, KeyValueStoreManager}, }; use crate::entrypoint::RouterConfig; @@ -409,7 +409,7 @@ impl LocalModel { self.card.move_to_nats(nats_client.clone()).await?; // Publish the Model Deployment Card to KV store - let kvstore: Box = Box::new(EtcdStorage::new(etcd_client.clone())); + let kvstore: Box = Box::new(EtcdStore::new(etcd_client.clone())); let card_store = Arc::new(KeyValueStoreManager::new(kvstore)); let lease_id = endpoint.drt().primary_lease().map(|l| l.id()).unwrap_or(0); let key = Key::from_raw(endpoint.unique_path(lease_id)); diff --git a/lib/llm/src/model_card.rs b/lib/llm/src/model_card.rs index 55d547eaf9..17da0eee43 100644 --- a/lib/llm/src/model_card.rs +++ b/lib/llm/src/model_card.rs @@ -23,7 +23,7 @@ use anyhow::{Context, Result}; use derive_builder::Builder; use dynamo_runtime::DistributedRuntime; use dynamo_runtime::storage::key_value_store::{ - EtcdStorage, Key, KeyValueStore, KeyValueStoreManager, + EtcdStore, Key, KeyValueStore, KeyValueStoreManager, }; use dynamo_runtime::{slug::Slug, storage::key_value_store::Versioned, transports::nats}; use serde::{Deserialize, Serialize}; @@ -457,7 +457,7 @@ impl ModelDeploymentCard { // Should be impossible because we only get here on an etcd event anyhow::bail!("Missing etcd_client"); }; - let store: Box = Box::new(EtcdStorage::new(etcd_client)); + let store: Box = Box::new(EtcdStore::new(etcd_client)); let card_store = Arc::new(KeyValueStoreManager::new(store)); let Some(mut card) = card_store .load::(ROOT_PATH, mdc_key) diff --git a/lib/runtime/src/component.rs b/lib/runtime/src/component.rs index 31e5cadc98..e0c529d6cd 100644 --- a/lib/runtime/src/component.rs +++ b/lib/runtime/src/component.rs @@ -72,7 +72,7 @@ pub mod service; pub use client::{Client, InstanceSource}; -/// The root etcd path where each instance registers itself in etcd. +/// The root key-value path where each instance registers itself in. /// An instance is namespace+component+endpoint+lease_id and must be unique. pub const INSTANCE_ROOT_PATH: &str = "v1/instances"; @@ -223,8 +223,8 @@ impl MetricsRegistry for Component { } impl Component { - /// The component part of an instance path in etcd. - pub fn etcd_root(&self) -> String { + /// The component part of an instance path in key-value store. + pub fn instance_root(&self) -> String { let ns = self.namespace.name(); let cp = &self.name; format!("{INSTANCE_ROOT_PATH}/{ns}/{cp}") @@ -266,8 +266,8 @@ impl Component { } pub async fn list_instances(&self) -> anyhow::Result> { - let client = self.drt.storage_client(); - let Some(bucket) = client.get_bucket(&self.etcd_root()).await? else { + let client = self.drt.store(); + let Some(bucket) = client.get_bucket(&self.instance_root()).await? else { return Ok(vec![]); }; let entries = bucket.entries().await?; @@ -467,7 +467,7 @@ impl Endpoint { /// The endpoint part of an instance path in etcd pub fn etcd_root(&self) -> String { - let component_path = self.component.etcd_root(); + let component_path = self.component.instance_root(); let endpoint_name = &self.name; format!("{component_path}/{endpoint_name}") } diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index dec6bded0d..ac114167a5 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 pub use crate::component::Component; -use crate::storage::key_value_store::{EtcdStorage, KeyValueStore, MemoryStorage}; +use crate::storage::key_value_store::{EtcdStore, KeyValueStore, MemoryStore}; use crate::transports::nats::DRTNatsClientPrometheusMetrics; use crate::{ ErrorContext, RuntimeCallback, @@ -45,15 +45,14 @@ impl DistributedRuntime { let runtime_clone = runtime.clone(); - let (etcd_client, storage_client) = if is_static { - let storage_client: Arc = Arc::new(MemoryStorage::new()); - (None, storage_client) + let (etcd_client, store) = if is_static { + let store: Arc = Arc::new(MemoryStore::new()); + (None, store) } else { let etcd_client = etcd::Client::new(etcd_config.clone(), runtime_clone).await?; - let storage_client: Arc = - Arc::new(EtcdStorage::new(etcd_client.clone())); + let store: Arc = Arc::new(EtcdStore::new(etcd_client.clone())); - (Some(etcd_client), storage_client) + (Some(etcd_client), store) }; let nats_client = nats_config.clone().connect().await?; @@ -83,7 +82,7 @@ impl DistributedRuntime { let distributed_runtime = Self { runtime, etcd_client, - storage_client, + store, nats_client, tcp_server: Arc::new(OnceCell::new()), system_status_server: Arc::new(OnceLock::new()), @@ -278,8 +277,9 @@ impl DistributedRuntime { } /// An interface to store things. Will eventually replace `etcd_client`. - pub fn storage_client(&self) -> Arc { - self.storage_client.clone() + /// Currently does key-value, but will grow to include whatever we need to store. + pub fn store(&self) -> Arc { + self.store.clone() } pub fn child_token(&self) -> CancellationToken { diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 1e0f1346db..db954d6a3a 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -154,7 +154,7 @@ pub struct DistributedRuntime { // we might consider a unifed transport manager here etcd_client: Option, nats_client: transports::nats::Client, - storage_client: Arc, + store: Arc, tcp_server: Arc>>, system_status_server: Arc>>, diff --git a/lib/runtime/src/storage/key_value_store.rs b/lib/runtime/src/storage/key_value_store.rs index 0f4296637f..03cdc06724 100644 --- a/lib/runtime/src/storage/key_value_store.rs +++ b/lib/runtime/src/storage/key_value_store.rs @@ -17,11 +17,11 @@ use futures::StreamExt; use serde::{Deserialize, Serialize}; mod mem; -pub use mem::MemoryStorage; +pub use mem::MemoryStore; mod nats; -pub use nats::NATSStorage; +pub use nats::NATSStore; mod etcd; -pub use etcd::EtcdStorage; +pub use etcd::EtcdStore; /// A key that is safe to use directly in the KV store. #[derive(Debug, Clone, PartialEq)] @@ -69,12 +69,12 @@ pub trait KeyValueStore: Send + Sync { bucket_name: &str, // auto-delete items older than this ttl: Option, - ) -> Result, StorageError>; + ) -> Result, StoreError>; async fn get_bucket( &self, bucket_name: &str, - ) -> Result>, StorageError>; + ) -> Result>, StoreError>; fn connection_id(&self) -> u64; } @@ -90,7 +90,7 @@ impl KeyValueStoreManager { &self, bucket: &str, key: &Key, - ) -> Result, StorageError> { + ) -> Result, StoreError> { let Some(bucket) = self.0.get_bucket(bucket).await? else { // No bucket means no cards return Ok(None); @@ -103,7 +103,7 @@ impl KeyValueStoreManager { Ok(None) => Ok(None), Err(err) => { // TODO look at what errors NATS can give us and make more specific wrappers - Err(StorageError::NATSError(err.to_string())) + Err(StoreError::NATSError(err.to_string())) } } } @@ -116,7 +116,7 @@ impl KeyValueStoreManager { bucket_name: &str, bucket_ttl: Option, ) -> ( - tokio::task::JoinHandle>, + tokio::task::JoinHandle>, tokio::sync::mpsc::UnboundedReceiver, ) { let bucket_name = bucket_name.to_string(); @@ -141,7 +141,7 @@ impl KeyValueStoreManager { let _ = tx.send(card); } - Ok::<(), StorageError>(()) + Ok::<(), StoreError>(()) }); (watch_task, rx) } @@ -152,14 +152,14 @@ impl KeyValueStoreManager { bucket_ttl: Option, key: &Key, obj: &mut T, - ) -> anyhow::Result { + ) -> anyhow::Result { let obj_json = serde_json::to_string(obj)?; let bucket = self.0.get_or_create_bucket(bucket_name, bucket_ttl).await?; let outcome = bucket.insert(key, &obj_json, obj.revision()).await?; match outcome { - StorageOutcome::Created(revision) | StorageOutcome::Exists(revision) => { + StoreOutcome::Created(revision) | StoreOutcome::Exists(revision) => { obj.set_revision(revision); } } @@ -178,43 +178,43 @@ pub trait KeyValueBucket: Send { key: &Key, value: &str, revision: u64, - ) -> Result; + ) -> Result; /// Fetch an item from the key-value storage - async fn get(&self, key: &Key) -> Result, StorageError>; + async fn get(&self, key: &Key) -> Result, StoreError>; /// Delete an item from the bucket - async fn delete(&self, key: &Key) -> Result<(), StorageError>; + async fn delete(&self, key: &Key) -> Result<(), StoreError>; /// A stream of items inserted into the bucket. /// Every time the stream is polled it will either return a newly created entry, or block until /// such time. async fn watch( &self, - ) -> Result + Send + 'life0>>, StorageError>; + ) -> Result + Send + 'life0>>, StoreError>; - async fn entries(&self) -> Result, StorageError>; + async fn entries(&self) -> Result, StoreError>; } #[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub enum StorageOutcome { +pub enum StoreOutcome { /// The operation succeeded and created a new entry with this revision. /// Note that "create" also means update, because each new revision is a "create". Created(u64), /// The operation did not do anything, the value was already present, with this revision. Exists(u64), } -impl fmt::Display for StorageOutcome { +impl fmt::Display for StoreOutcome { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - StorageOutcome::Created(revision) => write!(f, "Created at {revision}"), - StorageOutcome::Exists(revision) => write!(f, "Exists at {revision}"), + StoreOutcome::Created(revision) => write!(f, "Created at {revision}"), + StoreOutcome::Exists(revision) => write!(f, "Exists at {revision}"), } } } #[derive(thiserror::Error, Debug)] -pub enum StorageError { +pub enum StoreError { #[error("Could not find bucket '{0}'")] MissingBucket(String), @@ -293,12 +293,12 @@ mod tests { async fn test_memory_storage() -> anyhow::Result<()> { init(); - let s = Arc::new(MemoryStorage::new()); + let s = Arc::new(MemoryStore::new()); let s2 = Arc::clone(&s); let bucket = s.get_or_create_bucket(BUCKET_NAME, None).await?; let res = bucket.insert(&"test1".into(), "value1", 0).await?; - assert_eq!(res, StorageOutcome::Created(0)); + assert_eq!(res, StoreOutcome::Created(0)); let (got_first_tx, got_first_rx) = tokio::sync::oneshot::channel(); let ingress = tokio::spawn(async move { @@ -317,27 +317,27 @@ mod tests { let v = stream.next().await.unwrap(); assert_eq!(v, "value3".as_bytes()); - Ok::<_, StorageError>(()) + Ok::<_, StoreError>(()) }); - // MemoryStorage uses a HashMap with no inherent ordering, so we must ensure test1 is + // MemoryStore uses a HashMap with no inherent ordering, so we must ensure test1 is // fetched before test2 is inserted, otherwise they can come out in any order, and we // wouldn't be testing the watch behavior. got_first_rx.await?; let res = bucket.insert(&"test2".into(), "value2", 0).await?; - assert_eq!(res, StorageOutcome::Created(0)); + assert_eq!(res, StoreOutcome::Created(0)); // Repeat a key and revision. Ignored. let res = bucket.insert(&"test2".into(), "value2", 0).await?; - assert_eq!(res, StorageOutcome::Exists(0)); + assert_eq!(res, StoreOutcome::Exists(0)); // Increment revision let res = bucket.insert(&"test2".into(), "value2", 1).await?; - assert_eq!(res, StorageOutcome::Created(1)); + assert_eq!(res, StoreOutcome::Created(1)); let res = bucket.insert(&"test3".into(), "value3", 0).await?; - assert_eq!(res, StorageOutcome::Created(0)); + assert_eq!(res, StoreOutcome::Created(0)); // ingress exits once it has received all values let _ = ingress.await?; @@ -349,12 +349,12 @@ mod tests { async fn test_broadcast_stream() -> anyhow::Result<()> { init(); - let s: &'static _ = Box::leak(Box::new(MemoryStorage::new())); + let s: &'static _ = Box::leak(Box::new(MemoryStore::new())); let bucket: &'static _ = Box::leak(Box::new(s.get_or_create_bucket(BUCKET_NAME, None).await?)); let res = bucket.insert(&"test1".into(), "value1", 0).await?; - assert_eq!(res, StorageOutcome::Created(0)); + assert_eq!(res, StoreOutcome::Created(0)); let stream = bucket.watch().await?; let tap = TappableStream::new(stream, 10).await; diff --git a/lib/runtime/src/storage/key_value_store/etcd.rs b/lib/runtime/src/storage/key_value_store/etcd.rs index 5967dc888e..f50e809cd0 100644 --- a/lib/runtime/src/storage/key_value_store/etcd.rs +++ b/lib/runtime/src/storage/key_value_store/etcd.rs @@ -10,27 +10,27 @@ use async_stream::stream; use async_trait::async_trait; use etcd_client::{Compare, CompareOp, EventType, PutOptions, Txn, TxnOp, WatchOptions}; -use super::{KeyValueBucket, KeyValueStore, StorageError, StorageOutcome}; +use super::{KeyValueBucket, KeyValueStore, StoreError, StoreOutcome}; #[derive(Clone)] -pub struct EtcdStorage { +pub struct EtcdStore { client: Client, } -impl EtcdStorage { +impl EtcdStore { pub fn new(client: Client) -> Self { Self { client } } } #[async_trait] -impl KeyValueStore for EtcdStorage { +impl KeyValueStore for EtcdStore { /// A "bucket" in etcd is a path prefix async fn get_or_create_bucket( &self, bucket_name: &str, _ttl: Option, // TODO ttl not used yet - ) -> Result, StorageError> { + ) -> Result, StoreError> { Ok(self.get_bucket(bucket_name).await?.unwrap()) } @@ -39,7 +39,7 @@ impl KeyValueStore for EtcdStorage { async fn get_bucket( &self, bucket_name: &str, - ) -> Result>, StorageError> { + ) -> Result>, StoreError> { Ok(Some(Box::new(EtcdBucket { client: self.client.clone(), bucket_name: bucket_name.to_string(), @@ -66,7 +66,7 @@ impl KeyValueBucket for EtcdBucket { value: &str, // "version" in etcd speak. revision is a global cluster-wide value revision: u64, - ) -> Result { + ) -> Result { let version = revision; if version == 0 { self.create(key, value).await @@ -75,7 +75,7 @@ impl KeyValueBucket for EtcdBucket { } } - async fn get(&self, key: &Key) -> Result, StorageError> { + async fn get(&self, key: &Key) -> Result, StoreError> { let k = make_key(&self.bucket_name, key); tracing::trace!("etcd get: {k}"); @@ -83,7 +83,7 @@ impl KeyValueBucket for EtcdBucket { .client .kv_get(k, None) .await - .map_err(|e| StorageError::EtcdError(e.to_string()))?; + .map_err(|e| StoreError::EtcdError(e.to_string()))?; if kvs.is_empty() { return Ok(None); } @@ -91,20 +91,20 @@ impl KeyValueBucket for EtcdBucket { Ok(Some(val.into())) } - async fn delete(&self, key: &Key) -> Result<(), StorageError> { + async fn delete(&self, key: &Key) -> Result<(), StoreError> { let k = make_key(&self.bucket_name, key); tracing::trace!("etcd delete: {k}"); let _ = self .client .kv_delete(k, None) .await - .map_err(|e| StorageError::EtcdError(e.to_string()))?; + .map_err(|e| StoreError::EtcdError(e.to_string()))?; Ok(()) } async fn watch( &self, - ) -> Result + Send + 'life0>>, StorageError> + ) -> Result + Send + 'life0>>, StoreError> { let k = make_key(&self.bucket_name, &"".into()); tracing::trace!("etcd watch: {k}"); @@ -114,7 +114,7 @@ impl KeyValueBucket for EtcdBucket { .clone() .watch(k.as_bytes(), Some(WatchOptions::new().with_prefix())) .await - .map_err(|e| StorageError::EtcdError(e.to_string()))?; + .map_err(|e| StoreError::EtcdError(e.to_string()))?; let output = stream! { while let Ok(Some(resp)) = watch_stream.message().await { for e in resp.events() { @@ -128,7 +128,7 @@ impl KeyValueBucket for EtcdBucket { Ok(Box::pin(output)) } - async fn entries(&self) -> Result, StorageError> { + async fn entries(&self) -> Result, StoreError> { let k = make_key(&self.bucket_name, &"".into()); tracing::trace!("etcd entries: {k}"); @@ -136,7 +136,7 @@ impl KeyValueBucket for EtcdBucket { .client .kv_get_prefix(k) .await - .map_err(|e| StorageError::EtcdError(e.to_string()))?; + .map_err(|e| StoreError::EtcdError(e.to_string()))?; let out: HashMap = resp .into_iter() .map(|kv| { @@ -150,7 +150,7 @@ impl KeyValueBucket for EtcdBucket { } impl EtcdBucket { - async fn create(&self, key: &Key, value: &str) -> Result { + async fn create(&self, key: &Key, value: &str) -> Result { let k = make_key(&self.bucket_name, key); tracing::trace!("etcd create: {k}"); @@ -172,11 +172,11 @@ impl EtcdBucket { .kv_client() .txn(txn) .await - .map_err(|e| StorageError::EtcdError(e.to_string()))?; + .map_err(|e| StoreError::EtcdError(e.to_string()))?; if result.succeeded() { // Key was created successfully - return Ok(StorageOutcome::Created(1)); // version of new key is always 1 + return Ok(StoreOutcome::Created(1)); // version of new key is always 1 } // Key already existed, get its version @@ -185,10 +185,10 @@ impl EtcdBucket { && let Some(kv) = get_resp.kvs().first() { let version = kv.version() as u64; - return Ok(StorageOutcome::Exists(version)); + return Ok(StoreOutcome::Exists(version)); } // Shouldn't happen, but handle edge case - Err(StorageError::EtcdError( + Err(StoreError::EtcdError( "Unexpected transaction response".to_string(), )) } @@ -198,7 +198,7 @@ impl EtcdBucket { key: &Key, value: &str, revision: u64, - ) -> Result { + ) -> Result { let version = revision; let k = make_key(&self.bucket_name, key); tracing::trace!("etcd update: {k}"); @@ -207,9 +207,9 @@ impl EtcdBucket { .client .kv_get(k.clone(), None) .await - .map_err(|e| StorageError::EtcdError(e.to_string()))?; + .map_err(|e| StoreError::EtcdError(e.to_string()))?; if kvs.is_empty() { - return Err(StorageError::MissingKey(key.to_string())); + return Err(StoreError::MissingKey(key.to_string())); } let current_version = kvs.first().unwrap().version() as u64; if current_version != version + 1 { @@ -230,17 +230,17 @@ impl EtcdBucket { .client .kv_put_with_options(k, value, Some(put_options)) .await - .map_err(|e| StorageError::EtcdError(e.to_string()))?; + .map_err(|e| StoreError::EtcdError(e.to_string()))?; Ok(match put_resp.take_prev_key() { // Should this be an error? // The key was deleted between our get and put. We re-created it. // Version of new key is always 1. // - None => StorageOutcome::Created(1), + None => StoreOutcome::Created(1), // Expected case, success - Some(kv) if kv.version() as u64 == version + 1 => StorageOutcome::Created(version), + Some(kv) if kv.version() as u64 == version + 1 => StoreOutcome::Created(version), // Should this be an error? Something updated the version between our get and put - Some(kv) => StorageOutcome::Created(kv.version() as u64 + 1), + Some(kv) => StoreOutcome::Created(kv.version() as u64 + 1), }) } } @@ -269,9 +269,9 @@ mod concurrent_create_tests { }); } - async fn test_concurrent_create(drt: DistributedRuntime) -> Result<(), StorageError> { + async fn test_concurrent_create(drt: DistributedRuntime) -> Result<(), StoreError> { let etcd_client = drt.etcd_client().expect("etcd client should be available"); - let storage = EtcdStorage::new(etcd_client); + let storage = EtcdStore::new(etcd_client); // Create a bucket for testing let bucket = Arc::new(tokio::sync::Mutex::new( @@ -313,7 +313,7 @@ mod concurrent_create_tests { .await; match result { - Ok(StorageOutcome::Created(version)) => { + Ok(StoreOutcome::Created(version)) => { println!( "Worker {} successfully created key with version {}", worker_id, version @@ -322,7 +322,7 @@ mod concurrent_create_tests { *count += 1; Ok(version) } - Ok(StorageOutcome::Exists(version)) => { + Ok(StoreOutcome::Exists(version)) => { println!( "Worker {} found key already exists with version {}", worker_id, version diff --git a/lib/runtime/src/storage/key_value_store/mem.rs b/lib/runtime/src/storage/key_value_store/mem.rs index ac1b388fa8..cfa3dd5f64 100644 --- a/lib/runtime/src/storage/key_value_store/mem.rs +++ b/lib/runtime/src/storage/key_value_store/mem.rs @@ -14,21 +14,21 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use crate::storage::key_value_store::Key; -use super::{KeyValueBucket, KeyValueStore, StorageError, StorageOutcome}; +use super::{KeyValueBucket, KeyValueStore, StoreError, StoreOutcome}; #[derive(Clone)] -pub struct MemoryStorage { - inner: Arc, +pub struct MemoryStore { + inner: Arc, connection_id: u64, } -impl Default for MemoryStorage { +impl Default for MemoryStore { fn default() -> Self { Self::new() } } -struct MemoryStorageInner { +struct MemoryStoreInner { data: Mutex>, change_sender: UnboundedSender<(String, String)>, change_receiver: Mutex>, @@ -36,7 +36,7 @@ struct MemoryStorageInner { pub struct MemoryBucketRef { name: String, - inner: Arc, + inner: Arc, } struct MemoryBucket { @@ -51,11 +51,11 @@ impl MemoryBucket { } } -impl MemoryStorage { +impl MemoryStore { pub fn new() -> Self { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - MemoryStorage { - inner: Arc::new(MemoryStorageInner { + MemoryStore { + inner: Arc::new(MemoryStoreInner { data: Mutex::new(HashMap::new()), change_sender: tx, change_receiver: Mutex::new(rx), @@ -66,13 +66,13 @@ impl MemoryStorage { } #[async_trait] -impl KeyValueStore for MemoryStorage { +impl KeyValueStore for MemoryStore { async fn get_or_create_bucket( &self, bucket_name: &str, - // MemoryStorage doesn't respect TTL yet + // MemoryStore doesn't respect TTL yet _ttl: Option, - ) -> Result, StorageError> { + ) -> Result, StoreError> { let mut locked_data = self.inner.data.lock().await; // Ensure the bucket exists locked_data @@ -85,11 +85,11 @@ impl KeyValueStore for MemoryStorage { })) } - /// This operation cannot fail on MemoryStorage. Always returns Ok. + /// This operation cannot fail on MemoryStore. Always returns Ok. async fn get_bucket( &self, bucket_name: &str, - ) -> Result>, StorageError> { + ) -> Result>, StoreError> { let locked_data = self.inner.data.lock().await; match locked_data.get(bucket_name) { Some(_) => Ok(Some(Box::new(MemoryBucketRef { @@ -112,11 +112,11 @@ impl KeyValueBucket for MemoryBucketRef { key: &Key, value: &str, revision: u64, - ) -> Result { + ) -> Result { let mut locked_data = self.inner.data.lock().await; let mut b = locked_data.get_mut(&self.name); let Some(bucket) = b.as_mut() else { - return Err(StorageError::MissingBucket(self.name.to_string())); + return Err(StoreError::MissingBucket(self.name.to_string())); }; let outcome = match bucket.data.entry(key.to_string()) { Entry::Vacant(e) => { @@ -125,22 +125,22 @@ impl KeyValueBucket for MemoryBucketRef { .inner .change_sender .send((key.to_string(), value.to_string())); - StorageOutcome::Created(revision) + StoreOutcome::Created(revision) } Entry::Occupied(mut entry) => { let (rev, _v) = entry.get(); if *rev == revision { - StorageOutcome::Exists(revision) + StoreOutcome::Exists(revision) } else { entry.insert((revision, value.to_string())); - StorageOutcome::Created(revision) + StoreOutcome::Created(revision) } } }; Ok(outcome) } - async fn get(&self, key: &Key) -> Result, StorageError> { + async fn get(&self, key: &Key) -> Result, StoreError> { let locked_data = self.inner.data.lock().await; let Some(bucket) = locked_data.get(&self.name) else { return Ok(None); @@ -151,10 +151,10 @@ impl KeyValueBucket for MemoryBucketRef { .map(|(_, v)| bytes::Bytes::from(v.clone()))) } - async fn delete(&self, key: &Key) -> Result<(), StorageError> { + async fn delete(&self, key: &Key) -> Result<(), StoreError> { let mut locked_data = self.inner.data.lock().await; let Some(bucket) = locked_data.get_mut(&self.name) else { - return Err(StorageError::MissingBucket(self.name.to_string())); + return Err(StoreError::MissingBucket(self.name.to_string())); }; bucket.data.remove(&key.0); Ok(()) @@ -165,7 +165,7 @@ impl KeyValueBucket for MemoryBucketRef { /// Caller takes the lock so only a single caller may use this at once. async fn watch( &self, - ) -> Result + Send + 'life0>>, StorageError> + ) -> Result + Send + 'life0>>, StoreError> { Ok(Box::pin(async_stream::stream! { // All the existing ones first @@ -199,7 +199,7 @@ impl KeyValueBucket for MemoryBucketRef { })) } - async fn entries(&self) -> Result, StorageError> { + async fn entries(&self) -> Result, StoreError> { let locked_data = self.inner.data.lock().await; match locked_data.get(&self.name) { Some(bucket) => Ok(bucket @@ -207,7 +207,7 @@ impl KeyValueBucket for MemoryBucketRef { .iter() .map(|(k, (_rev, v))| (k.to_string(), bytes::Bytes::from(v.clone()))) .collect()), - None => Err(StorageError::MissingBucket(self.name.clone())), + None => Err(StoreError::MissingBucket(self.name.clone())), } } } diff --git a/lib/runtime/src/storage/key_value_store/nats.rs b/lib/runtime/src/storage/key_value_store/nats.rs index 7ebe4da54d..c8fcf5f988 100644 --- a/lib/runtime/src/storage/key_value_store/nats.rs +++ b/lib/runtime/src/storage/key_value_store/nats.rs @@ -9,10 +9,10 @@ use crate::{ use async_trait::async_trait; use futures::StreamExt; -use super::{KeyValueBucket, KeyValueStore, StorageError, StorageOutcome}; +use super::{KeyValueBucket, KeyValueStore, StoreError, StoreOutcome}; #[derive(Clone)] -pub struct NATSStorage { +pub struct NATSStore { client: Client, endpoint: EndpointId, } @@ -22,12 +22,12 @@ pub struct NATSBucket { } #[async_trait] -impl KeyValueStore for NATSStorage { +impl KeyValueStore for NATSStore { async fn get_or_create_bucket( &self, bucket_name: &str, ttl: Option, - ) -> Result, StorageError> { + ) -> Result, StoreError> { let name = Slug::slugify(bucket_name); let nats_store = self .get_or_create_key_value(&self.endpoint.namespace, &name, ttl) @@ -38,7 +38,7 @@ impl KeyValueStore for NATSStorage { async fn get_bucket( &self, bucket_name: &str, - ) -> Result>, StorageError> { + ) -> Result>, StoreError> { let name = Slug::slugify(bucket_name); match self.get_key_value(&self.endpoint.namespace, &name).await? { Some(nats_store) => Ok(Some(Box::new(NATSBucket { nats_store }))), @@ -51,9 +51,9 @@ impl KeyValueStore for NATSStorage { } } -impl NATSStorage { +impl NATSStore { pub fn new(client: Client, endpoint: EndpointId) -> Self { - NATSStorage { client, endpoint } + NATSStore { client, endpoint } } /// Get or create a key-value store (aka bucket) in NATS. @@ -66,7 +66,7 @@ impl NATSStorage { bucket_name: &Slug, // Delete entries older than this ttl: Option, - ) -> Result { + ) -> Result { if let Ok(Some(kv)) = self.get_key_value(namespace, bucket_name).await { return Ok(kv); } @@ -86,7 +86,7 @@ impl NATSStorage { ) .await; let nats_store = create_result - .map_err(|err| StorageError::KeyValueError(err.to_string(), bucket_name.clone()))?; + .map_err(|err| StoreError::KeyValueError(err.to_string(), bucket_name.clone()))?; tracing::debug!("Created bucket {bucket_name}"); Ok(nats_store) } @@ -95,7 +95,7 @@ impl NATSStorage { &self, namespace: &str, bucket_name: &Slug, - ) -> Result, StorageError> { + ) -> Result, StoreError> { let bucket_name = single_name(namespace, bucket_name); let js = self.client.jetstream(); @@ -106,7 +106,7 @@ impl NATSStorage { // bucket doesn't exist Ok(None) } - Err(err) => Err(StorageError::KeyValueError(err.to_string(), bucket_name)), + Err(err) => Err(StoreError::KeyValueError(err.to_string(), bucket_name)), } } } @@ -118,7 +118,7 @@ impl KeyValueBucket for NATSBucket { key: &Key, value: &str, revision: u64, - ) -> Result { + ) -> Result { if revision == 0 { self.create(key, value).await } else { @@ -126,29 +126,29 @@ impl KeyValueBucket for NATSBucket { } } - async fn get(&self, key: &Key) -> Result, StorageError> { + async fn get(&self, key: &Key) -> Result, StoreError> { self.nats_store .get(key) .await - .map_err(|e| StorageError::NATSError(e.to_string())) + .map_err(|e| StoreError::NATSError(e.to_string())) } - async fn delete(&self, key: &Key) -> Result<(), StorageError> { + async fn delete(&self, key: &Key) -> Result<(), StoreError> { self.nats_store .delete(key) .await - .map_err(|e| StorageError::NATSError(e.to_string())) + .map_err(|e| StoreError::NATSError(e.to_string())) } async fn watch( &self, - ) -> Result + Send + 'life0>>, StorageError> + ) -> Result + Send + 'life0>>, StoreError> { let watch_stream = self .nats_store .watch_all() .await - .map_err(|e| StorageError::NATSError(e.to_string()))?; + .map_err(|e| StoreError::NATSError(e.to_string()))?; // Map the `Entry` to `Entry.value` which is Bytes of the stored value. Ok(Box::pin( watch_stream.filter_map( @@ -168,12 +168,12 @@ impl KeyValueBucket for NATSBucket { )) } - async fn entries(&self) -> Result, StorageError> { + async fn entries(&self) -> Result, StoreError> { let mut key_stream = self .nats_store .keys() .await - .map_err(|e| StorageError::NATSError(e.to_string()))?; + .map_err(|e| StoreError::NATSError(e.to_string()))?; let mut out = HashMap::new(); while let Some(Ok(key)) = key_stream.next().await { if let Ok(Some(entry)) = self.nats_store.entry(&key).await { @@ -185,24 +185,24 @@ impl KeyValueBucket for NATSBucket { } impl NATSBucket { - async fn create(&self, key: &Key, value: &str) -> Result { + async fn create(&self, key: &Key, value: &str) -> Result { match self.nats_store.create(&key, value.to_string().into()).await { - Ok(revision) => Ok(StorageOutcome::Created(revision)), + Ok(revision) => Ok(StoreOutcome::Created(revision)), Err(err) if err.kind() == async_nats::jetstream::kv::CreateErrorKind::AlreadyExists => { // key exists, get the revsion match self.nats_store.entry(key).await { - Ok(Some(entry)) => Ok(StorageOutcome::Exists(entry.revision)), + Ok(Some(entry)) => Ok(StoreOutcome::Exists(entry.revision)), Ok(None) => { tracing::error!( %key, "Race condition, key deleted between create and fetch. Retry." ); - Err(StorageError::Retry) + Err(StoreError::Retry) } - Err(err) => Err(StorageError::NATSError(err.to_string())), + Err(err) => Err(StoreError::NATSError(err.to_string())), } } - Err(err) => Err(StorageError::NATSError(err.to_string())), + Err(err) => Err(StoreError::NATSError(err.to_string())), } } @@ -211,26 +211,26 @@ impl NATSBucket { key: &Key, value: &str, revision: u64, - ) -> Result { + ) -> Result { match self .nats_store .update(key, value.to_string().into(), revision) .await { - Ok(revision) => Ok(StorageOutcome::Created(revision)), + Ok(revision) => Ok(StoreOutcome::Created(revision)), Err(err) if err.kind() == async_nats::jetstream::kv::UpdateErrorKind::WrongLastRevision => { tracing::warn!(revision, %key, "Update WrongLastRevision, resync"); self.resync_update(key, value).await } - Err(err) => Err(StorageError::NATSError(err.to_string())), + Err(err) => Err(StoreError::NATSError(err.to_string())), } } /// We have the wrong revision for a key. Fetch it's entry to get the correct revision, /// and try the update again. - async fn resync_update(&self, key: &Key, value: &str) -> Result { + async fn resync_update(&self, key: &Key, value: &str) -> Result { match self.nats_store.entry(key).await { Ok(Some(entry)) => { // Re-try the update with new version number @@ -240,8 +240,8 @@ impl NATSBucket { .update(key, value.to_string().into(), next_rev) .await { - Ok(correct_revision) => Ok(StorageOutcome::Created(correct_revision)), - Err(err) => Err(StorageError::NATSError(format!( + Ok(correct_revision) => Ok(StoreOutcome::Created(correct_revision)), + Err(err) => Err(StoreError::NATSError(format!( "Error during update of key {key} after resync: {err}" ))), } @@ -252,7 +252,7 @@ impl NATSBucket { } Err(err) => { tracing::error!(%key, %err, "Failed fetching entry during resync"); - Err(StorageError::NATSError(err.to_string())) + Err(StoreError::NATSError(err.to_string())) } } }