Skip to content

Commit 23b38ea

Browse files
grahamkingziqifan617
authored andcommitted
feat: Introduce storage_client in DistributedRuntime (#3507)
Signed-off-by: Graham King <[email protected]>
1 parent aa65217 commit 23b38ea

File tree

13 files changed

+243
-178
lines changed

13 files changed

+243
-178
lines changed

lib/llm/src/grpc/service/kserve.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ impl State {
5757
}
5858
}
5959

60-
pub fn new_with_etcd(manager: Arc<ModelManager>, etcd_client: Option<etcd::Client>) -> Self {
60+
pub fn new_with_etcd(manager: Arc<ModelManager>, etcd_client: etcd::Client) -> Self {
6161
Self {
6262
manager,
6363
metrics: Arc::new(Metrics::default()),
64-
etcd_client,
64+
etcd_client: Some(etcd_client),
6565
}
6666
}
6767

@@ -155,7 +155,10 @@ impl KserveServiceConfigBuilder {
155155
let config: KserveServiceConfig = self.build_internal()?;
156156

157157
let model_manager = Arc::new(ModelManager::new());
158-
let state = Arc::new(State::new_with_etcd(model_manager, config.etcd_client));
158+
let state = match config.etcd_client {
159+
Some(etcd_client) => Arc::new(State::new_with_etcd(model_manager, etcd_client)),
160+
None => Arc::new(State::new(model_manager)),
161+
};
159162

160163
// enable prometheus metrics
161164
let registry = metrics::Registry::new();

lib/llm/src/http/service/health.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,12 @@ async fn live_handler(
5252
async fn health_handler(
5353
axum::extract::State(state): axum::extract::State<Arc<service_v2::State>>,
5454
) -> impl IntoResponse {
55-
let instances = if let Some(etcd_client) = state.etcd_client() {
56-
match list_all_instances(etcd_client).await {
57-
Ok(instances) => instances,
58-
Err(err) => {
59-
tracing::warn!("Failed to fetch instances from etcd: {}", err);
60-
vec![]
61-
}
55+
let instances = match list_all_instances(state.store()).await {
56+
Ok(instances) => instances,
57+
Err(err) => {
58+
tracing::warn!(%err, "Failed to fetch instances from store");
59+
vec![]
6260
}
63-
} else {
64-
vec![]
6561
};
6662

6763
let mut endpoints: Vec<String> = instances

lib/llm/src/http/service/service_v2.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,21 @@ use anyhow::Result;
1919
use axum_server::tls_rustls::RustlsConfig;
2020
use derive_builder::Builder;
2121
use dynamo_runtime::logging::make_request_span;
22+
use dynamo_runtime::storage::key_value_store::EtcdStore;
23+
use dynamo_runtime::storage::key_value_store::KeyValueStore;
24+
use dynamo_runtime::storage::key_value_store::MemoryStore;
2225
use dynamo_runtime::transports::etcd;
2326
use std::net::SocketAddr;
2427
use tokio::task::JoinHandle;
2528
use tokio_util::sync::CancellationToken;
2629
use tower_http::trace::TraceLayer;
2730

2831
/// HTTP service shared state
29-
#[derive(Default)]
3032
pub struct State {
3133
metrics: Arc<Metrics>,
3234
manager: Arc<ModelManager>,
3335
etcd_client: Option<etcd::Client>,
36+
store: Arc<dyn KeyValueStore>,
3437
flags: StateFlags,
3538
}
3639

@@ -76,6 +79,7 @@ impl State {
7679
manager,
7780
metrics: Arc::new(Metrics::default()),
7881
etcd_client: None,
82+
store: Arc::new(MemoryStore::new()),
7983
flags: StateFlags {
8084
chat_endpoints_enabled: AtomicBool::new(false),
8185
cmpl_endpoints_enabled: AtomicBool::new(false),
@@ -85,11 +89,12 @@ impl State {
8589
}
8690
}
8791

88-
pub fn new_with_etcd(manager: Arc<ModelManager>, etcd_client: Option<etcd::Client>) -> Self {
92+
pub fn new_with_etcd(manager: Arc<ModelManager>, etcd_client: etcd::Client) -> Self {
8993
Self {
9094
manager,
9195
metrics: Arc::new(Metrics::default()),
92-
etcd_client,
96+
store: Arc::new(EtcdStore::new(etcd_client.clone())),
97+
etcd_client: Some(etcd_client),
9398
flags: StateFlags {
9499
chat_endpoints_enabled: AtomicBool::new(false),
95100
cmpl_endpoints_enabled: AtomicBool::new(false),
@@ -115,6 +120,10 @@ impl State {
115120
self.etcd_client.as_ref()
116121
}
117122

123+
pub fn store(&self) -> Arc<dyn KeyValueStore> {
124+
self.store.clone()
125+
}
126+
118127
// TODO
119128
pub fn sse_keep_alive(&self) -> Option<Duration> {
120129
None
@@ -294,9 +303,10 @@ impl HttpServiceConfigBuilder {
294303
let config: HttpServiceConfig = self.build_internal()?;
295304

296305
let model_manager = Arc::new(ModelManager::new());
297-
let etcd_client = config.etcd_client;
298-
let state = Arc::new(State::new_with_etcd(model_manager, etcd_client));
299-
306+
let state = match config.etcd_client {
307+
Some(etcd_client) => Arc::new(State::new_with_etcd(model_manager, etcd_client)),
308+
None => Arc::new(State::new(model_manager)),
309+
};
300310
state
301311
.flags
302312
.set(&EndpointType::Chat, config.enable_chat_endpoints);

lib/llm/src/local_model.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use dynamo_runtime::storage::key_value_store::Key;
1212
use dynamo_runtime::traits::DistributedRuntimeProvider;
1313
use dynamo_runtime::{
1414
component::Endpoint,
15-
storage::key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager},
15+
storage::key_value_store::{EtcdStore, KeyValueStore, KeyValueStoreManager},
1616
};
1717

1818
use crate::entrypoint::RouterConfig;
@@ -409,7 +409,7 @@ impl LocalModel {
409409
self.card.move_to_nats(nats_client.clone()).await?;
410410

411411
// Publish the Model Deployment Card to KV store
412-
let kvstore: Box<dyn KeyValueStore> = Box::new(EtcdStorage::new(etcd_client.clone()));
412+
let kvstore: Box<dyn KeyValueStore> = Box::new(EtcdStore::new(etcd_client.clone()));
413413
let card_store = Arc::new(KeyValueStoreManager::new(kvstore));
414414
let lease_id = endpoint.drt().primary_lease().map(|l| l.id()).unwrap_or(0);
415415
let key = Key::from_raw(endpoint.unique_path(lease_id));

lib/llm/src/model_card.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use anyhow::{Context, Result};
2323
use derive_builder::Builder;
2424
use dynamo_runtime::DistributedRuntime;
2525
use dynamo_runtime::storage::key_value_store::{
26-
EtcdStorage, Key, KeyValueStore, KeyValueStoreManager,
26+
EtcdStore, Key, KeyValueStore, KeyValueStoreManager,
2727
};
2828
use dynamo_runtime::{slug::Slug, storage::key_value_store::Versioned, transports::nats};
2929
use serde::{Deserialize, Serialize};
@@ -457,7 +457,7 @@ impl ModelDeploymentCard {
457457
// Should be impossible because we only get here on an etcd event
458458
anyhow::bail!("Missing etcd_client");
459459
};
460-
let store: Box<dyn KeyValueStore> = Box::new(EtcdStorage::new(etcd_client));
460+
let store: Box<dyn KeyValueStore> = Box::new(EtcdStore::new(etcd_client));
461461
let card_store = Arc::new(KeyValueStoreManager::new(store));
462462
let Some(mut card) = card_store
463463
.load::<ModelDeploymentCard>(ROOT_PATH, mdc_key)

lib/runtime/src/component.rs

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
//!
3030
//! TODO: Top-level Overview of Endpoints/Functions
3131
32+
use std::fmt;
33+
3234
use crate::{
3335
config::HealthStatus,
3436
discovery::Lease,
@@ -70,7 +72,7 @@ pub mod service;
7072

7173
pub use client::{Client, InstanceSource};
7274

73-
/// The root etcd path where each instance registers itself in etcd.
75+
/// The root key-value path where each instance registers itself in.
7476
/// An instance is namespace+component+endpoint+lease_id and must be unique.
7577
pub const INSTANCE_ROOT_PATH: &str = "v1/instances";
7678

@@ -91,7 +93,7 @@ pub struct Registry {
9193
inner: Arc<tokio::sync::Mutex<RegistryInner>>,
9294
}
9395

94-
#[derive(Debug, Clone, Serialize, Deserialize)]
96+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
9597
pub struct Instance {
9698
pub component: String,
9799
pub endpoint: String,
@@ -113,6 +115,30 @@ impl Instance {
113115
}
114116
}
115117

118+
impl fmt::Display for Instance {
119+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120+
write!(
121+
f,
122+
"{}/{}/{}/{}",
123+
self.namespace, self.component, self.endpoint, self.instance_id
124+
)
125+
}
126+
}
127+
128+
/// Sort by string name
129+
impl std::cmp::Ord for Instance {
130+
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
131+
self.to_string().cmp(&other.to_string())
132+
}
133+
}
134+
135+
impl PartialOrd for Instance {
136+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
137+
// Since Ord is fully implemented, the comparison is always total.
138+
Some(self.cmp(other))
139+
}
140+
}
141+
116142
/// A [Component] a discoverable entity in the distributed runtime.
117143
/// You can host [Endpoint] on a [Component] by first creating
118144
/// a [Service] then adding one or more [Endpoint] to the [Service].
@@ -197,8 +223,8 @@ impl MetricsRegistry for Component {
197223
}
198224

199225
impl Component {
200-
/// The component part of an instance path in etcd.
201-
pub fn etcd_root(&self) -> String {
226+
/// The component part of an instance path in key-value store.
227+
pub fn instance_root(&self) -> String {
202228
let ns = self.namespace.name();
203229
let cp = &self.name;
204230
format!("{INSTANCE_ROOT_PATH}/{ns}/{cp}")
@@ -240,27 +266,23 @@ impl Component {
240266
}
241267

242268
pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
243-
let Some(etcd_client) = self.drt.etcd_client() else {
269+
let client = self.drt.store();
270+
let Some(bucket) = client.get_bucket(&self.instance_root()).await? else {
244271
return Ok(vec![]);
245272
};
246-
let mut out = vec![];
247-
// The extra slash is important to only list exact component matches, not substrings.
248-
for kv in etcd_client
249-
.kv_get_prefix(format!("{}/", self.etcd_root()))
250-
.await?
251-
{
252-
let val = match serde_json::from_slice::<Instance>(kv.value()) {
273+
let entries = bucket.entries().await?;
274+
let mut instances = Vec::with_capacity(entries.len());
275+
for (name, bytes) in entries.into_iter() {
276+
let val = match serde_json::from_slice::<Instance>(&bytes) {
253277
Ok(val) => val,
254278
Err(err) => {
255-
anyhow::bail!(
256-
"Error converting etcd response to Instance: {err}. {}",
257-
kv.value_str()?
258-
);
279+
anyhow::bail!("Error converting storage response to Instance: {err}. {name}",);
259280
}
260281
};
261-
out.push(val);
282+
instances.push(val);
262283
}
263-
Ok(out)
284+
instances.sort();
285+
Ok(instances)
264286
}
265287

266288
/// Scrape ServiceSet, which contains NATS stats as well as user defined stats
@@ -445,7 +467,7 @@ impl Endpoint {
445467

446468
/// The endpoint part of an instance path in etcd
447469
pub fn etcd_root(&self) -> String {
448-
let component_path = self.component.etcd_root();
470+
let component_path = self.component.instance_root();
449471
let endpoint_name = &self.name;
450472
format!("{component_path}/{endpoint_name}")
451473
}

lib/runtime/src/distributed.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
pub use crate::component::Component;
5+
use crate::storage::key_value_store::{EtcdStore, KeyValueStore, MemoryStore};
56
use crate::transports::nats::DRTNatsClientPrometheusMetrics;
67
use crate::{
78
ErrorContext, RuntimeCallback,
@@ -44,10 +45,14 @@ impl DistributedRuntime {
4445

4546
let runtime_clone = runtime.clone();
4647

47-
let etcd_client = if is_static {
48-
None
48+
let (etcd_client, store) = if is_static {
49+
let store: Arc<dyn KeyValueStore> = Arc::new(MemoryStore::new());
50+
(None, store)
4951
} else {
50-
Some(etcd::Client::new(etcd_config.clone(), runtime_clone).await?)
52+
let etcd_client = etcd::Client::new(etcd_config.clone(), runtime_clone).await?;
53+
let store: Arc<dyn KeyValueStore> = Arc::new(EtcdStore::new(etcd_client.clone()));
54+
55+
(Some(etcd_client), store)
5156
};
5257

5358
let nats_client = nats_config.clone().connect().await?;
@@ -77,6 +82,7 @@ impl DistributedRuntime {
7782
let distributed_runtime = Self {
7883
runtime,
7984
etcd_client,
85+
store,
8086
nats_client,
8187
tcp_server: Arc::new(OnceCell::new()),
8288
system_status_server: Arc::new(OnceLock::new()),
@@ -270,6 +276,12 @@ impl DistributedRuntime {
270276
self.etcd_client.clone()
271277
}
272278

279+
/// An interface to store things. Will eventually replace `etcd_client`.
280+
/// Currently does key-value, but will grow to include whatever we need to store.
281+
pub fn store(&self) -> Arc<dyn KeyValueStore> {
282+
self.store.clone()
283+
}
284+
273285
pub fn child_token(&self) -> CancellationToken {
274286
self.runtime.child_token()
275287
}

lib/runtime/src/instances.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,28 @@
77
//! the entire distributed system, complementing the component-specific
88
//! instance listing in `component.rs`.
99
10+
use std::sync::Arc;
11+
1012
use crate::component::{INSTANCE_ROOT_PATH, Instance};
13+
use crate::storage::key_value_store::KeyValueStore;
1114
use crate::transports::etcd::Client as EtcdClient;
1215

13-
pub async fn list_all_instances(etcd_client: &EtcdClient) -> anyhow::Result<Vec<Instance>> {
14-
let mut instances = Vec::new();
16+
pub async fn list_all_instances(client: Arc<dyn KeyValueStore>) -> anyhow::Result<Vec<Instance>> {
17+
let Some(bucket) = client.get_bucket(INSTANCE_ROOT_PATH).await? else {
18+
return Ok(vec![]);
19+
};
1520

16-
for kv in etcd_client
17-
.kv_get_prefix(format!("{}/", INSTANCE_ROOT_PATH))
18-
.await?
19-
{
20-
match serde_json::from_slice::<Instance>(kv.value()) {
21+
let entries = bucket.entries().await?;
22+
let mut instances = Vec::with_capacity(entries.len());
23+
for (name, bytes) in entries.into_iter() {
24+
match serde_json::from_slice::<Instance>(&bytes) {
2125
Ok(instance) => instances.push(instance),
2226
Err(err) => {
23-
tracing::warn!(
24-
"Failed to parse instance from etcd: {}. Key: {}, Value: {}",
25-
err,
26-
kv.key_str().unwrap_or("invalid_key"),
27-
kv.value_str().unwrap_or("invalid_value")
28-
);
27+
tracing::warn!(%err, key = name, "Failed to parse instance from storage");
2928
}
3029
}
3130
}
31+
instances.sort();
3232

3333
Ok(instances)
3434
}

lib/runtime/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ pub use system_health::{HealthCheckTarget, SystemHealth};
5151
pub use tokio_util::sync::CancellationToken;
5252
pub use worker::Worker;
5353

54-
use crate::metrics::prometheus_names::distributed_runtime;
54+
use crate::{
55+
metrics::prometheus_names::distributed_runtime, storage::key_value_store::KeyValueStore,
56+
};
5557

5658
use component::{Endpoint, InstanceSource};
5759
use utils::GracefulShutdownTracker;
@@ -152,6 +154,7 @@ pub struct DistributedRuntime {
152154
// we might consider a unifed transport manager here
153155
etcd_client: Option<transports::etcd::Client>,
154156
nats_client: transports::nats::Client,
157+
store: Arc<dyn KeyValueStore>,
155158
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
156159
system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
157160

0 commit comments

Comments
 (0)