Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions lib/llm/src/grpc/service/kserve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ impl State {
}
}

pub fn new_with_etcd(manager: Arc<ModelManager>, etcd_client: Option<etcd::Client>) -> Self {
pub fn new_with_etcd(manager: Arc<ModelManager>, etcd_client: etcd::Client) -> Self {
Self {
manager,
metrics: Arc::new(Metrics::default()),
etcd_client,
etcd_client: Some(etcd_client),
}
}

Expand Down Expand Up @@ -155,7 +155,10 @@ impl KserveServiceConfigBuilder {
let config: KserveServiceConfig = self.build_internal()?;

let model_manager = Arc::new(ModelManager::new());
let state = Arc::new(State::new_with_etcd(model_manager, config.etcd_client));
let state = match config.etcd_client {
Some(etcd_client) => Arc::new(State::new_with_etcd(model_manager, etcd_client)),
None => Arc::new(State::new(model_manager)),
};

// enable prometheus metrics
let registry = metrics::Registry::new();
Expand Down
14 changes: 5 additions & 9 deletions lib/llm/src/http/service/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,12 @@ async fn live_handler(
async fn health_handler(
axum::extract::State(state): axum::extract::State<Arc<service_v2::State>>,
) -> impl IntoResponse {
let instances = if let Some(etcd_client) = state.etcd_client() {
match list_all_instances(etcd_client).await {
Ok(instances) => instances,
Err(err) => {
tracing::warn!("Failed to fetch instances from etcd: {}", err);
vec![]
}
let instances = match list_all_instances(state.store()).await {
Ok(instances) => instances,
Err(err) => {
tracing::warn!(%err, "Failed to fetch instances from store");
vec![]
}
} else {
vec![]
};

let mut endpoints: Vec<String> = instances
Expand Down
22 changes: 16 additions & 6 deletions lib/llm/src/http/service/service_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@ use anyhow::Result;
use axum_server::tls_rustls::RustlsConfig;
use derive_builder::Builder;
use dynamo_runtime::logging::make_request_span;
use dynamo_runtime::storage::key_value_store::EtcdStore;
use dynamo_runtime::storage::key_value_store::KeyValueStore;
use dynamo_runtime::storage::key_value_store::MemoryStore;
use dynamo_runtime::transports::etcd;
use std::net::SocketAddr;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tower_http::trace::TraceLayer;

/// HTTP service shared state
#[derive(Default)]
pub struct State {
metrics: Arc<Metrics>,
manager: Arc<ModelManager>,
etcd_client: Option<etcd::Client>,
store: Arc<dyn KeyValueStore>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already merged but for future PRs:

I'm looking at this and I'm thinking, wait, couldn't you set etc_client as etcd::Client, and also store as EtcdStore (which contains etcd::Client)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, I see that you're doing that in 92 (double storing etcd).

It would be good to see a comment saying something like
// Right now I'll store both during this transition phase. In the next PR I will get rid of etcd_client and just use store.

flags: StateFlags,
}

Expand Down Expand Up @@ -76,6 +79,7 @@ impl State {
manager,
metrics: Arc::new(Metrics::default()),
etcd_client: None,
store: Arc::new(MemoryStore::new()),
flags: StateFlags {
chat_endpoints_enabled: AtomicBool::new(false),
cmpl_endpoints_enabled: AtomicBool::new(false),
Expand All @@ -85,11 +89,12 @@ impl State {
}
}

pub fn new_with_etcd(manager: Arc<ModelManager>, etcd_client: Option<etcd::Client>) -> Self {
pub fn new_with_etcd(manager: Arc<ModelManager>, etcd_client: etcd::Client) -> Self {
Self {
manager,
metrics: Arc::new(Metrics::default()),
etcd_client,
store: Arc::new(EtcdStore::new(etcd_client.clone())),
etcd_client: Some(etcd_client),
flags: StateFlags {
chat_endpoints_enabled: AtomicBool::new(false),
cmpl_endpoints_enabled: AtomicBool::new(false),
Expand All @@ -115,6 +120,10 @@ impl State {
self.etcd_client.as_ref()
}

pub fn store(&self) -> Arc<dyn KeyValueStore> {
self.store.clone()
}

// TODO
pub fn sse_keep_alive(&self) -> Option<Duration> {
None
Expand Down Expand Up @@ -294,9 +303,10 @@ impl HttpServiceConfigBuilder {
let config: HttpServiceConfig = self.build_internal()?;

let model_manager = Arc::new(ModelManager::new());
let etcd_client = config.etcd_client;
let state = Arc::new(State::new_with_etcd(model_manager, etcd_client));

let state = match config.etcd_client {
Some(etcd_client) => Arc::new(State::new_with_etcd(model_manager, etcd_client)),
None => Arc::new(State::new(model_manager)),
};
state
.flags
.set(&EndpointType::Chat, config.enable_chat_endpoints);
Expand Down
4 changes: 2 additions & 2 deletions lib/llm/src/local_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use dynamo_runtime::storage::key_value_store::Key;
use dynamo_runtime::traits::DistributedRuntimeProvider;
use dynamo_runtime::{
component::Endpoint,
storage::key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager},
storage::key_value_store::{EtcdStore, KeyValueStore, KeyValueStoreManager},
};

use crate::entrypoint::RouterConfig;
Expand Down Expand Up @@ -409,7 +409,7 @@ impl LocalModel {
self.card.move_to_nats(nats_client.clone()).await?;

// Publish the Model Deployment Card to KV store
let kvstore: Box<dyn KeyValueStore> = Box::new(EtcdStorage::new(etcd_client.clone()));
let kvstore: Box<dyn KeyValueStore> = Box::new(EtcdStore::new(etcd_client.clone()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AI is taking over the word kv and in the future this variable is probably best renamed as just keyval_store

let card_store = Arc::new(KeyValueStoreManager::new(kvstore));
let lease_id = endpoint.drt().primary_lease().map(|l| l.id()).unwrap_or(0);
let key = Key::from_raw(endpoint.unique_path(lease_id));
Expand Down
4 changes: 2 additions & 2 deletions lib/llm/src/model_card.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use anyhow::{Context, Result};
use derive_builder::Builder;
use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::storage::key_value_store::{
EtcdStorage, Key, KeyValueStore, KeyValueStoreManager,
EtcdStore, Key, KeyValueStore, KeyValueStoreManager,
};
use dynamo_runtime::{slug::Slug, storage::key_value_store::Versioned, transports::nats};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -457,7 +457,7 @@ impl ModelDeploymentCard {
// Should be impossible because we only get here on an etcd event
anyhow::bail!("Missing etcd_client");
};
let store: Box<dyn KeyValueStore> = Box::new(EtcdStorage::new(etcd_client));
let store: Box<dyn KeyValueStore> = Box::new(EtcdStore::new(etcd_client));
let card_store = Arc::new(KeyValueStoreManager::new(store));
let Some(mut card) = card_store
.load::<ModelDeploymentCard>(ROOT_PATH, mdc_key)
Expand Down
60 changes: 41 additions & 19 deletions lib/runtime/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
//!
//! TODO: Top-level Overview of Endpoints/Functions

use std::fmt;

use crate::{
config::HealthStatus,
discovery::Lease,
Expand Down Expand Up @@ -70,7 +72,7 @@ pub mod service;

pub use client::{Client, InstanceSource};

/// The root etcd path where each instance registers itself in etcd.
/// The root key-value path where each instance registers itself in.
/// An instance is namespace+component+endpoint+lease_id and must be unique.
pub const INSTANCE_ROOT_PATH: &str = "v1/instances";

Expand All @@ -91,7 +93,7 @@ pub struct Registry {
inner: Arc<tokio::sync::Mutex<RegistryInner>>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Instance {
pub component: String,
pub endpoint: String,
Expand All @@ -113,6 +115,30 @@ impl Instance {
}
}

impl fmt::Display for Instance {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}/{}/{}/{}",
self.namespace, self.component, self.endpoint, self.instance_id
)
}
}

/// Sort by string name
impl std::cmp::Ord for Instance {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.to_string().cmp(&other.to_string())
}
}

impl PartialOrd for Instance {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
// Since Ord is fully implemented, the comparison is always total.
Some(self.cmp(other))
}
}

/// A [Component] a discoverable entity in the distributed runtime.
/// You can host [Endpoint] on a [Component] by first creating
/// a [Service] then adding one or more [Endpoint] to the [Service].
Expand Down Expand Up @@ -197,8 +223,8 @@ impl MetricsRegistry for Component {
}

impl Component {
/// The component part of an instance path in etcd.
pub fn etcd_root(&self) -> String {
/// The component part of an instance path in key-value store.
pub fn instance_root(&self) -> String {
let ns = self.namespace.name();
let cp = &self.name;
format!("{INSTANCE_ROOT_PATH}/{ns}/{cp}")
Expand Down Expand Up @@ -240,27 +266,23 @@ impl Component {
}

pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
let Some(etcd_client) = self.drt.etcd_client() else {
let client = self.drt.store();
let Some(bucket) = client.get_bucket(&self.instance_root()).await? else {
return Ok(vec![]);
};
let mut out = vec![];
// The extra slash is important to only list exact component matches, not substrings.
for kv in etcd_client
.kv_get_prefix(format!("{}/", self.etcd_root()))
.await?
{
let val = match serde_json::from_slice::<Instance>(kv.value()) {
let entries = bucket.entries().await?;
let mut instances = Vec::with_capacity(entries.len());
for (name, bytes) in entries.into_iter() {
let val = match serde_json::from_slice::<Instance>(&bytes) {
Ok(val) => val,
Err(err) => {
anyhow::bail!(
"Error converting etcd response to Instance: {err}. {}",
kv.value_str()?
);
anyhow::bail!("Error converting storage response to Instance: {err}. {name}",);
}
};
out.push(val);
instances.push(val);
}
Ok(out)
instances.sort();
Ok(instances)
}

/// Scrape ServiceSet, which contains NATS stats as well as user defined stats
Expand Down Expand Up @@ -445,7 +467,7 @@ impl Endpoint {

/// The endpoint part of an instance path in etcd
pub fn etcd_root(&self) -> String {
let component_path = self.component.etcd_root();
let component_path = self.component.instance_root();
let endpoint_name = &self.name;
format!("{component_path}/{endpoint_name}")
}
Expand Down
18 changes: 15 additions & 3 deletions lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

pub use crate::component::Component;
use crate::storage::key_value_store::{EtcdStore, KeyValueStore, MemoryStore};
use crate::transports::nats::DRTNatsClientPrometheusMetrics;
use crate::{
ErrorContext, RuntimeCallback,
Expand Down Expand Up @@ -44,10 +45,14 @@ impl DistributedRuntime {

let runtime_clone = runtime.clone();

let etcd_client = if is_static {
None
let (etcd_client, store) = if is_static {
let store: Arc<dyn KeyValueStore> = Arc::new(MemoryStore::new());
(None, store)
} else {
Some(etcd::Client::new(etcd_config.clone(), runtime_clone).await?)
let etcd_client = etcd::Client::new(etcd_config.clone(), runtime_clone).await?;
let store: Arc<dyn KeyValueStore> = Arc::new(EtcdStore::new(etcd_client.clone()));

(Some(etcd_client), store)
};

let nats_client = nats_config.clone().connect().await?;
Expand Down Expand Up @@ -77,6 +82,7 @@ impl DistributedRuntime {
let distributed_runtime = Self {
runtime,
etcd_client,
store,
nats_client,
tcp_server: Arc::new(OnceCell::new()),
system_status_server: Arc::new(OnceLock::new()),
Expand Down Expand Up @@ -270,6 +276,12 @@ impl DistributedRuntime {
self.etcd_client.clone()
}

/// An interface to store things. Will eventually replace `etcd_client`.
/// Currently does key-value, but will grow to include whatever we need to store.
pub fn store(&self) -> Arc<dyn KeyValueStore> {
self.store.clone()
}

pub fn child_token(&self) -> CancellationToken {
self.runtime.child_token()
}
Expand Down
26 changes: 13 additions & 13 deletions lib/runtime/src/instances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,28 @@
//! the entire distributed system, complementing the component-specific
//! instance listing in `component.rs`.
use std::sync::Arc;

use crate::component::{INSTANCE_ROOT_PATH, Instance};
use crate::storage::key_value_store::KeyValueStore;
use crate::transports::etcd::Client as EtcdClient;

pub async fn list_all_instances(etcd_client: &EtcdClient) -> anyhow::Result<Vec<Instance>> {
let mut instances = Vec::new();
pub async fn list_all_instances(client: Arc<dyn KeyValueStore>) -> anyhow::Result<Vec<Instance>> {
let Some(bucket) = client.get_bucket(INSTANCE_ROOT_PATH).await? else {
return Ok(vec![]);
};

for kv in etcd_client
.kv_get_prefix(format!("{}/", INSTANCE_ROOT_PATH))
.await?
{
match serde_json::from_slice::<Instance>(kv.value()) {
let entries = bucket.entries().await?;
let mut instances = Vec::with_capacity(entries.len());
for (name, bytes) in entries.into_iter() {
match serde_json::from_slice::<Instance>(&bytes) {
Ok(instance) => instances.push(instance),
Err(err) => {
tracing::warn!(
"Failed to parse instance from etcd: {}. Key: {}, Value: {}",
err,
kv.key_str().unwrap_or("invalid_key"),
kv.value_str().unwrap_or("invalid_value")
);
tracing::warn!(%err, key = name, "Failed to parse instance from storage");
}
}
}
instances.sort();

Ok(instances)
}
5 changes: 4 additions & 1 deletion lib/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ pub use system_health::{HealthCheckTarget, SystemHealth};
pub use tokio_util::sync::CancellationToken;
pub use worker::Worker;

use crate::metrics::prometheus_names::distributed_runtime;
use crate::{
metrics::prometheus_names::distributed_runtime, storage::key_value_store::KeyValueStore,
};

use component::{Endpoint, InstanceSource};
use utils::GracefulShutdownTracker;
Expand Down Expand Up @@ -152,6 +154,7 @@ pub struct DistributedRuntime {
// we might consider a unifed transport manager here
etcd_client: Option<transports::etcd::Client>,
nats_client: transports::nats::Client,
store: Arc<dyn KeyValueStore>,
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,

Expand Down
Loading
Loading