Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions components/metrics/src/bin/mock_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use dynamo_llm::kv_router::{
protocols::ForwardPassMetrics, scheduler::KVHitRateEvent, KV_HIT_RATE_SUBJECT,
};
use dynamo_runtime::{
component::{service::EndpointStats, Namespace},
logging,
pipeline::{
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
ResponseStream, SingleIn,
},
entity::{EntityChain, service::EndpointStats, Namespace},
protocols::annotated::Annotated,
stream,
traits::events::EventPublisher,
Expand Down Expand Up @@ -136,8 +136,8 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> {
.service_builder()
.create()
.await?;
let endpoint = component.endpoint("my_endpoint");
tracing::info!("Starting Mock Worker on Endpoint: {}", endpoint.path());
let endpoint = component.endpoint("my_endpoint")?;
tracing::info!("Starting Mock Worker on Endpoint: {}", endpoint);

// Spawn background task for publishing KV hit rate events
let namespace_clone = namespace.clone();
Expand Down
2 changes: 1 addition & 1 deletion components/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ use dynamo_llm::kv_router::scheduler::Endpoint;
use dynamo_llm::kv_router::scoring::ProcessedEndpoints;

use dynamo_runtime::{
distributed::Component, error, service::EndpointInfo, utils::Duration, Result,
error, service::EndpointInfo, utils::Duration, Result, entity::Component
};

/// Configuration for metrics collection mode
Expand Down
17 changes: 9 additions & 8 deletions components/metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use clap::Parser;
use dynamo_llm::kv_router::scheduler::KVHitRateEvent;
use dynamo_llm::kv_router::KV_HIT_RATE_SUBJECT;
use dynamo_runtime::{
entity::EntityChain,
error, logging,
traits::events::{EventPublisher, EventSubscriber},
utils::{Duration, Instant},
Expand Down Expand Up @@ -119,11 +120,11 @@ async fn app(runtime: Runtime) -> Result<()> {

let drt = DistributedRuntime::from_settings(runtime.clone()).await?;

let namespace = drt.namespace(args.namespace)?;
let namespace = drt.namespace(&args.namespace)?;
let component = namespace.component("count")?;

// Create unique instance of Count
let key = format!("{}/instance", component.etcd_root());
let key = format!("{}/instance", component);
tracing::debug!("Creating unique instance of Count at {key}");
drt.etcd_client()
.expect("Unreachable because of DistributedRuntime::from_settings above")
Expand All @@ -132,11 +133,11 @@ async fn app(runtime: Runtime) -> Result<()> {
.context("Unable to create unique instance of Count; possibly one already exists")?;

let target_component = namespace.component(&config.component_name)?;
let target_endpoint = target_component.endpoint(&config.endpoint_name);
let target_endpoint = target_component.endpoint(&config.endpoint_name)?;

let service_path = target_endpoint.path();
let service_subject = target_endpoint.subject();
tracing::info!("Scraping endpoint {service_path} for stats");
// let service_path = target_endpoint.path();
let service_subject = target_endpoint.to_descriptor().identifier().slug().to_string();
tracing::info!("Scraping endpoint {service_subject} for stats");

// Safety: DistributedRuntime::from_settings ensures this is Some
let token = drt.primary_lease().unwrap().child_token();
Expand Down Expand Up @@ -224,14 +225,14 @@ async fn app(runtime: Runtime) -> Result<()> {
let endpoints =
collect_endpoints(&target_component, &service_subject, scrape_timeout).await?;
if endpoints.is_empty() {
tracing::warn!("No endpoints found matching {service_path}");
tracing::warn!("No endpoints found matching {service_subject}");
continue;
}

let metrics = extract_metrics(&endpoints);
let processed = postprocess_metrics(&metrics, &endpoints);
if processed.endpoints.is_empty() {
tracing::warn!("No metrics found matching {service_path}");
tracing::warn!("No metrics found matching {service_subject}");
} else {
tracing::info!("Aggregated metrics: {processed:?}");
}
Expand Down
3 changes: 2 additions & 1 deletion components/router/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use dynamo_llm::kv_router::{
};
use dynamo_runtime::{
logging, pipeline::network::Ingress, DistributedRuntime, Result, Runtime, Worker,
entity::EntityChain
};

#[derive(Parser)]
Expand Down Expand Up @@ -73,7 +74,7 @@ async fn app(runtime: Runtime) -> Result<()> {
.service_builder()
.create()
.await?
.endpoint("generate")
.endpoint("generate")?
.endpoint_builder()
.handler(router)
.start()
Expand Down
4 changes: 2 additions & 2 deletions launch/dynamo-run/src/input/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use dynamo_runtime::engine::AsyncEngineStream;
use dynamo_runtime::pipeline::{
network::Ingress, Context, ManyOut, Operator, SegmentSource, ServiceBackend, SingleIn, Source,
};
use dynamo_runtime::{protocols::Endpoint as EndpointId, DistributedRuntime};
use dynamo_runtime::{protocols::Endpoint as EndpointId, DistributedRuntime, entity::EntityChain};

use crate::EngineConfig;

Expand All @@ -50,7 +50,7 @@ pub async fn run(
.service_builder()
.create()
.await?
.endpoint(&endpoint_id.name);
.endpoint(&endpoint_id.name)?;

let (rt_fut, card): (Pin<Box<dyn Future<Output = _> + Send + 'static>>, _) = match engine_config
{
Expand Down
20 changes: 10 additions & 10 deletions launch/llmctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::sync::Arc;
use clap::{Parser, Subcommand};

use dynamo_llm::discovery::{ModelManager, ModelWatcher};
use dynamo_llm::local_model::{LocalModel, ModelNetworkName};
use dynamo_llm::local_model::LocalModel;
use dynamo_llm::model_type::ModelType;
use dynamo_runtime::component::Endpoint;
use dynamo_runtime::entity::{EntityChain, Endpoint};
use dynamo_runtime::pipeline::RouterMode;
use dynamo_runtime::{
distributed::DistributedConfig, logging, DistributedRuntime, Result, Runtime, Worker,
Expand Down Expand Up @@ -278,9 +278,9 @@ async fn list_models(
models.push(ModelRow {
model_type: entry.model_type.as_str().to_string(),
name: entry.name,
namespace: entry.endpoint.namespace,
component: entry.endpoint.component,
endpoint: entry.endpoint.name,
namespace: entry.instance.identifier().namespace_name().to_string(),
component: entry.instance.identifier().component_name().unwrap().to_string(), // safe because instance has component_name
endpoint: entry.instance.identifier().endpoint_name().unwrap().to_string(), // safe because instance has endpoint_name
});
}

Expand Down Expand Up @@ -324,10 +324,10 @@ async fn remove_model(
.into_iter()
.filter(|entry| entry.model_type == model_type)
{
let network_name = ModelNetworkName::from_entry(&entry, 0);
tracing::debug!("deleting key: {network_name}");
let instance_name = entry.instance.to_string();
tracing::debug!("deleting key: {instance_name}");
etcd_client
.kv_delete(network_name.to_string(), None)
.kv_delete(instance_name, None)
.await?;
}

Expand All @@ -353,7 +353,7 @@ fn endpoint_from_name(

let component = distributed
.namespace(namespace)?
.component(component_name)?;
.component(&component_name)?;

Ok(component.endpoint(endpoint_name))
Ok(component.endpoint(&endpoint_name)?)
}
3 changes: 2 additions & 1 deletion lib/bindings/c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
use dynamo_llm::kv_router::{
indexer::compute_block_hash_for_seq, protocols::*, publisher::KvEventPublisher,
};
use dynamo_runtime::entity::EntityChain;
use dynamo_runtime::{DistributedRuntime, Worker};
static WK: OnceCell<Worker> = OnceCell::new();
static DRT: AsyncOnceCell<DistributedRuntime> = AsyncOnceCell::new();
Expand Down Expand Up @@ -147,7 +148,7 @@ fn dynamo_create_kv_publisher(
.ok_or(anyhow::Error::msg("Could not get Distributed Runtime"))
{
Ok(drt) => {
let backend = drt.namespace(namespace)?.component(component)?;
let backend = drt.namespace(&namespace)?.component(&component)?;
KvEventPublisher::new(backend, worker_id, kv_block_size, None)
}
Err(e) => Err(e),
Expand Down
17 changes: 10 additions & 7 deletions lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use dynamo_runtime::{
pipeline::{EngineStream, ManyOut, SingleIn},
protocols::annotated::Annotated as RsAnnotated,
traits::DistributedRuntimeProvider,
entity::EntityChain
};

use dynamo_llm::{self as llm_rs};
Expand Down Expand Up @@ -164,21 +165,21 @@ struct CancellationToken {
#[pyclass]
#[derive(Clone)]
struct Namespace {
inner: rs::component::Namespace,
inner: rs::entity::Namespace,
event_loop: PyObject,
}

#[pyclass]
#[derive(Clone)]
struct Component {
inner: rs::component::Component,
inner: rs::entity::Component,
event_loop: PyObject,
}

#[pyclass]
#[derive(Clone)]
struct Endpoint {
inner: rs::component::Endpoint,
inner: rs::entity::Endpoint,
event_loop: PyObject,
}

Expand Down Expand Up @@ -230,7 +231,7 @@ impl DistributedRuntime {

fn namespace(&self, name: String) -> PyResult<Namespace> {
Ok(Namespace {
inner: self.inner.namespace(name).map_err(to_pyerr)?,
inner: self.inner.namespace(&name).map_err(to_pyerr)?,
event_loop: self.event_loop.clone(),
})
}
Expand Down Expand Up @@ -416,7 +417,7 @@ impl CancellationToken {
#[pymethods]
impl Component {
fn endpoint(&self, name: String) -> PyResult<Endpoint> {
let inner = self.inner.endpoint(name);
let inner = self.inner.endpoint(&name).map_err(to_pyerr)?;
Ok(Endpoint {
inner,
event_loop: self.event_loop.clone(),
Expand Down Expand Up @@ -481,7 +482,7 @@ impl Endpoint {
#[pymethods]
impl Namespace {
fn component(&self, name: String) -> PyResult<Component> {
let inner = self.inner.component(name).map_err(to_pyerr)?;
let inner = self.inner.component(&name).map_err(to_pyerr)?;
Ok(Component {
inner,
event_loop: self.event_loop.clone(),
Expand Down Expand Up @@ -598,11 +599,13 @@ impl Client {
/// Replaces wait_for_endpoints.
fn wait_for_instances<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let inner = self.router.client.clone();

// Safety: We don't expose static, so instance_id will exist
pyo3_async_runtimes::tokio::future_into_py(py, async move {
inner
.wait_for_instances()
.await
.map(|v| v.into_iter().map(|cei| cei.id()).collect::<Vec<i64>>())
.map(|v| v.into_iter().map(|cei| cei.instance_id().unwrap()).collect::<Vec<i64>>())
.map_err(to_pyerr)
})
}
Expand Down
4 changes: 2 additions & 2 deletions lib/llm/src/discovery/model_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use std::sync::Arc;

use dynamo_runtime::transports::etcd;
use dynamo_runtime::{
protocols,
slug::Slug,
storage::key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager},
descriptor::Instance
};
use serde::{Deserialize, Serialize};

Expand All @@ -24,7 +24,7 @@ pub struct ModelEntry {
pub name: String,

/// How to address this on the network
pub endpoint: protocols::Endpoint,
pub instance: Instance,

/// Specifies whether the model is a chat, completions, etc model.
pub model_type: ModelType,
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/discovery/model_manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use dynamo_runtime::component::Component;
use dynamo_runtime::entity::Component;

use crate::discovery::ModelEntry;

Expand Down
16 changes: 10 additions & 6 deletions lib/llm/src/discovery/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use dynamo_runtime::{
transports::etcd::{KeyValue, WatchEvent},
DistributedRuntime,
};
use dynamo_runtime::entity::{EntityChain, ToEntity};

use crate::{
backend::Backend,
Expand Down Expand Up @@ -160,12 +161,15 @@ impl ModelWatcher {
// Handles a PUT event from etcd, this usually means adding a new model to the list of served
// models.
async fn handle_put(&self, model_entry: &ModelEntry) -> anyhow::Result<()> {
let endpoint_id = model_entry.endpoint.clone();
let component = self
.drt
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
let client = component.endpoint(&endpoint_id.name).client().await?;
let instance = model_entry.instance.clone();
let endpoint = instance.to_entity(self.drt.clone())?;
let component = endpoint.component();
let client = endpoint.client().await?;
// let component = self
// .drt
// .namespace(&endpoint_id.namespace)?
// .component(&endpoint_id.component)?;
// let client = component.endpoint(&endpoint_id.name).client().await?;

let Some(etcd_client) = self.drt.etcd_client() else {
// Should be impossible because we only get here on an etcd event
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/http/service/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn health_handler(
} else {
let endpoints: Vec<String> = model_entries
.iter()
.map(|entry| entry.endpoint.as_url())
.map(|entry| entry.instance.to_string())
.collect();
(
StatusCode::OK,
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/kv_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;

use anyhow::Result;
use dynamo_runtime::{
component::{Component, InstanceSource},
entity::{Component, InstanceSource},
pipeline::{
async_trait, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, PushRouter,
ResponseStream, SingleIn,
Expand Down
8 changes: 4 additions & 4 deletions lib/llm/src/kv_router/metrics_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::kv_router::KV_METRICS_ENDPOINT;

use crate::kv_router::scheduler::Endpoint;
use crate::kv_router::ProcessedEndpoints;
use dynamo_runtime::component::Component;
use dynamo_runtime::entity::Component;
use dynamo_runtime::{service::EndpointInfo, utils::Duration, Result};
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
Expand All @@ -44,7 +44,7 @@ impl KvMetricsAggregator {
));

Self {
service_name: component.service_name(),
service_name: component.to_descriptor().slug().to_string(),
endpoints_rx: watch_rx,
}
}
Expand Down Expand Up @@ -96,8 +96,8 @@ pub async fn collect_endpoints_task(
) {
let backoff_delay = Duration::from_millis(100);
let scrape_timeout = Duration::from_millis(300);
let endpoint = component.endpoint(KV_METRICS_ENDPOINT);
let service_subject = endpoint.subject();
let endpoint = component.endpoint(KV_METRICS_ENDPOINT).unwrap();
let service_subject = endpoint.to_descriptor().identifier().slug().to_string();

loop {
tokio::select! {
Expand Down
4 changes: 2 additions & 2 deletions lib/llm/src/kv_router/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::kv_router::{
use async_trait::async_trait;
use dynamo_runtime::traits::{events::EventPublisher, DistributedRuntimeProvider};
use dynamo_runtime::{
component::Component,
entity::Component,
pipeline::{
network::Ingress, AsyncEngine, AsyncEngineContextProvider, ManyOut, ResponseStream,
SingleIn,
Expand Down Expand Up @@ -485,7 +485,7 @@ impl WorkerMetricsPublisher {
let handler = Ingress::for_engine(handler)?;

component
.endpoint(KV_METRICS_ENDPOINT)
.endpoint(KV_METRICS_ENDPOINT)?
.endpoint_builder()
.stats_handler(move |_| {
let metrics = metrics_rx.borrow_and_update().clone();
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/kv_router/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use dynamo_runtime::component::Namespace;
use dynamo_runtime::entity::Namespace;
use dynamo_runtime::traits::events::EventPublisher;
use rand::Rng;
use serde::{Deserialize, Serialize};
Expand Down
Loading