diff --git a/examples/llm/components/disagg_router.py b/examples/llm/components/disagg_router.py index 714bdc76ca..c7dbaadeb1 100644 --- a/examples/llm/components/disagg_router.py +++ b/examples/llm/components/disagg_router.py @@ -15,6 +15,8 @@ import logging +from utils.ns import get_namespace + from dynamo.runtime import EtcdKvCache from dynamo.sdk import dynamo_context @@ -38,7 +40,7 @@ async def async_init(self): runtime = dynamo_context["runtime"] self.etcd_kv_cache = await EtcdKvCache.create( runtime.etcd_client(), - "/dynamo/disagg_router/", + f"/{get_namespace()}/disagg_router/", { "max_local_prefill_length": str(self.max_local_prefill_length), "max_prefill_queue_size": str(self.max_prefill_queue_size), diff --git a/examples/llm/components/frontend.py b/examples/llm/components/frontend.py index 68bfe58707..f234fb0de2 100644 --- a/examples/llm/components/frontend.py +++ b/examples/llm/components/frontend.py @@ -21,6 +21,7 @@ from components.worker import VllmWorker from fastapi import FastAPI from pydantic import BaseModel +from utils.ns import get_namespace from dynamo import sdk from dynamo.sdk import async_on_shutdown, depends, service @@ -44,7 +45,6 @@ class FrontendConfig(BaseModel): """Configuration for the Frontend service including model and HTTP server settings.""" served_model_name: str - endpoint: str port: int = 8080 @@ -52,7 +52,7 @@ class FrontendConfig(BaseModel): @service( dynamo={ "enabled": True, - "namespace": "dynamo", + "namespace": get_namespace(), }, resources={"cpu": "10", "memory": "20Gi"}, workers=1, @@ -78,6 +78,8 @@ def setup_model(self): subprocess.run( [ "llmctl", + "-n", + get_namespace(), "http", "remove", "chat-models", @@ -88,11 +90,13 @@ def setup_model(self): subprocess.run( [ "llmctl", + "-n", + get_namespace(), "http", "add", "chat-models", self.frontend_config.served_model_name, - self.frontend_config.endpoint, + f"{get_namespace()}.Processor.chat/completions", ], check=False, ) @@ -103,7 +107,13 @@ def start_http_server(self): http_binary = get_http_binary_path() self.process = subprocess.Popen( - [http_binary, "-p", str(self.frontend_config.port)], + [ + http_binary, + "-p", + str(self.frontend_config.port), + "--namespace", + get_namespace(), + ], stdout=None, stderr=None, ) @@ -116,6 +126,8 @@ def cleanup(self): subprocess.run( [ "llmctl", + "-n", + get_namespace(), "http", "remove", "chat-models", diff --git a/examples/llm/components/kv_router.py b/examples/llm/components/kv_router.py index 9e54fa0d0d..f0edd7bbfd 100644 --- a/examples/llm/components/kv_router.py +++ b/examples/llm/components/kv_router.py @@ -22,6 +22,7 @@ from components.worker import VllmWorker from utils.logging import check_required_workers +from utils.ns import get_namespace from utils.protocol import Tokens from vllm.logger import logger as vllm_logger @@ -70,7 +71,7 @@ def parse_args(service_name, prefix) -> Namespace: @service( dynamo={ "enabled": True, - "namespace": "dynamo", + "namespace": get_namespace(), }, resources={"cpu": "10", "memory": "20Gi"}, workers=1, @@ -96,7 +97,7 @@ def __init__(self): async def async_init(self): self.runtime = dynamo_context["runtime"] self.workers_client = ( - await self.runtime.namespace("dynamo") + await self.runtime.namespace(get_namespace()) .component("VllmWorker") .endpoint("generate") .client() @@ -104,7 +105,7 @@ async def async_init(self): await check_required_workers(self.workers_client, self.args.min_workers) - kv_listener = self.runtime.namespace("dynamo").component("VllmWorker") + kv_listener = self.runtime.namespace(get_namespace()).component("VllmWorker") await kv_listener.create_service() self.indexer = KvIndexer(kv_listener, self.args.block_size) self.metrics_aggregator = KvMetricsAggregator(kv_listener) diff --git a/examples/llm/components/prefill_worker.py b/examples/llm/components/prefill_worker.py index 985ea4b47e..1bf22d164e 100644 --- a/examples/llm/components/prefill_worker.py +++ b/examples/llm/components/prefill_worker.py @@ -21,6 +21,7 @@ from pydantic import BaseModel from utils.nixl import NixlMetadataStore +from utils.ns import get_namespace from utils.prefill_queue import PrefillQueue from utils.vllm import parse_vllm_args from vllm.entrypoints.openai.api_server import ( @@ -42,7 +43,7 @@ class RequestType(BaseModel): @service( dynamo={ "enabled": True, - "namespace": "dynamo", + "namespace": get_namespace(), "custom_lease": LeaseConfig(ttl=1), # 1 second }, resources={"gpu": 1, "cpu": "10", "memory": "20Gi"}, @@ -87,7 +88,7 @@ async def async_init(self): raise RuntimeError("Failed to initialize engine client") runtime = dynamo_context["runtime"] metadata = self.engine_client.nixl_metadata - self._metadata_store = NixlMetadataStore("dynamo", runtime) + self._metadata_store = NixlMetadataStore(get_namespace(), runtime) await self._metadata_store.put(metadata.engine_id, metadata) self.task = asyncio.create_task(self.prefill_queue_handler()) @@ -119,9 +120,13 @@ async def prefill_queue_handler(self): logger.info("Prefill queue handler entered") prefill_queue_nats_server = os.getenv("NATS_SERVER", "nats://localhost:4222") prefill_queue_stream_name = ( - self.engine_args.served_model_name - if self.engine_args.served_model_name is not None - else "vllm" + get_namespace() + + "_" + + ( + self.engine_args.served_model_name + if self.engine_args.served_model_name is not None + else "vllm" + ) ) logger.info( f"Prefill queue: {prefill_queue_nats_server}:{prefill_queue_stream_name}" diff --git a/examples/llm/components/processor.py b/examples/llm/components/processor.py index 8b146105ef..daa822ef9e 100644 --- a/examples/llm/components/processor.py +++ b/examples/llm/components/processor.py @@ -23,6 +23,7 @@ from transformers import AutoTokenizer from utils.chat_processor import ChatProcessor, CompletionsProcessor, ProcessMixIn from utils.logging import check_required_workers +from utils.ns import get_namespace from utils.protocol import MyRequestOutput, Tokens, vLLMGenerateRequest from utils.vllm import RouterType, parse_vllm_args from vllm.engine.arg_utils import AsyncEngineArgs @@ -45,7 +46,7 @@ class RequestType(Enum): @service( dynamo={ "enabled": True, - "namespace": "dynamo", + "namespace": get_namespace(), }, resources={"cpu": "10", "memory": "20Gi"}, workers=1, @@ -106,13 +107,13 @@ async def async_init(self): await check_required_workers(self.worker_client, self.min_workers) - kv_listener = runtime.namespace("dynamo").component("VllmWorker") + kv_listener = runtime.namespace(get_namespace()).component("VllmWorker") await kv_listener.create_service() self.metrics_aggregator = KvMetricsAggregator(kv_listener) self.etcd_kv_cache = await EtcdKvCache.create( runtime.etcd_client(), - "/dynamo/processor/", + f"/{get_namespace()}/processor/", {"router": self.engine_args.router}, ) diff --git a/examples/llm/components/worker.py b/examples/llm/components/worker.py index f65e17a865..6939a11b15 100644 --- a/examples/llm/components/worker.py +++ b/examples/llm/components/worker.py @@ -22,6 +22,7 @@ from components.disagg_router import PyDisaggregatedRouter from components.prefill_worker import PrefillWorker from utils.nixl import NixlMetadataStore +from utils.ns import get_namespace from utils.prefill_queue import PrefillQueue from utils.protocol import MyRequestOutput, vLLMGenerateRequest from utils.vllm import RouterType, parse_vllm_args @@ -41,7 +42,7 @@ @service( dynamo={ "enabled": True, - "namespace": "dynamo", + "namespace": get_namespace(), "custom_lease": LeaseConfig(ttl=1), # 1 second }, resources={"gpu": 1, "cpu": "10", "memory": "20Gi"}, @@ -64,7 +65,7 @@ def __init__(self): self._prefill_queue_nats_server = os.getenv( "NATS_SERVER", "nats://localhost:4222" ) - self._prefill_queue_stream_name = self.model_name + self._prefill_queue_stream_name = get_namespace() + "_" + self.model_name logger.info( f"Prefill queue: {self._prefill_queue_nats_server}:{self._prefill_queue_stream_name}" ) @@ -90,7 +91,7 @@ def __init__(self): self.engine_args.enable_prefix_caching = True os.environ["VLLM_WORKER_ID"] = str(dynamo_context.get("lease").id()) - os.environ["VLLM_KV_NAMESPACE"] = "dynamo" + os.environ["VLLM_KV_NAMESPACE"] = get_namespace() os.environ["VLLM_KV_COMPONENT"] = class_name self.metrics_publisher = KvMetricsPublisher() @@ -128,7 +129,7 @@ async def async_init(self): if self.engine_args.remote_prefill: metadata = self.engine_client.nixl_metadata - metadata_store = NixlMetadataStore("dynamo", runtime) + metadata_store = NixlMetadataStore(get_namespace(), runtime) await metadata_store.put(metadata.engine_id, metadata) if self.engine_args.conditional_disagg: diff --git a/examples/llm/configs/agg.yaml b/examples/llm/configs/agg.yaml index 2ba292d17e..ea0b9aa032 100644 --- a/examples/llm/configs/agg.yaml +++ b/examples/llm/configs/agg.yaml @@ -19,7 +19,6 @@ Common: Frontend: served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B - endpoint: dynamo.Processor.chat/completions port: 8000 Processor: diff --git a/examples/llm/configs/agg_router.yaml b/examples/llm/configs/agg_router.yaml index 9369553015..5ddcbd118b 100644 --- a/examples/llm/configs/agg_router.yaml +++ b/examples/llm/configs/agg_router.yaml @@ -21,7 +21,6 @@ Common: Frontend: served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B - endpoint: dynamo.Processor.chat/completions port: 8000 Processor: diff --git a/examples/llm/configs/disagg.yaml b/examples/llm/configs/disagg.yaml index 6999caed10..31202b6daa 100644 --- a/examples/llm/configs/disagg.yaml +++ b/examples/llm/configs/disagg.yaml @@ -20,7 +20,6 @@ Common: Frontend: served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B - endpoint: dynamo.Processor.chat/completions port: 8000 Processor: diff --git a/examples/llm/configs/disagg_router.yaml b/examples/llm/configs/disagg_router.yaml index 4902011b5b..c128eb158c 100644 --- a/examples/llm/configs/disagg_router.yaml +++ b/examples/llm/configs/disagg_router.yaml @@ -21,7 +21,6 @@ Common: Frontend: served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B - endpoint: dynamo.Processor.chat/completions port: 8000 Processor: diff --git a/examples/llm/configs/multinode-405b.yaml b/examples/llm/configs/multinode-405b.yaml index 0ef9628a29..fae49b47ae 100644 --- a/examples/llm/configs/multinode-405b.yaml +++ b/examples/llm/configs/multinode-405b.yaml @@ -18,7 +18,6 @@ Frontend: served_model_name: nvidia/Llama-3.1-405B-Instruct-FP8 - endpoint: dynamo.Processor.chat/completions port: 8000 Processor: diff --git a/examples/llm/configs/multinode_agg_r1.yaml b/examples/llm/configs/multinode_agg_r1.yaml index 2872316e4a..9caf709ac7 100644 --- a/examples/llm/configs/multinode_agg_r1.yaml +++ b/examples/llm/configs/multinode_agg_r1.yaml @@ -19,7 +19,6 @@ Common: Frontend: served_model_name: deepseek-ai/DeepSeek-R1 - endpoint: dynamo.Processor.chat/completions port: 8000 Processor: diff --git a/examples/llm/configs/mutinode_disagg_r1.yaml b/examples/llm/configs/mutinode_disagg_r1.yaml index e8d0291f0c..9f1e67265d 100644 --- a/examples/llm/configs/mutinode_disagg_r1.yaml +++ b/examples/llm/configs/mutinode_disagg_r1.yaml @@ -21,7 +21,6 @@ Common: Frontend: served_model_name: deepseek-ai/DeepSeek-R1 - endpoint: dynamo.Processor.chat/completions port: 8000 Processor: diff --git a/examples/llm/utils/ns.py b/examples/llm/utils/ns.py new file mode 100644 index 0000000000..d65032321e --- /dev/null +++ b/examples/llm/utils/ns.py @@ -0,0 +1,21 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os + + +def get_namespace(): + return os.getenv("DYN_NAMESPACE", "dynamo")