From e25d5f1ca81416a865bfff13376ce00d737c6a59 Mon Sep 17 00:00:00 2001 From: wxsm Date: Mon, 28 Apr 2025 16:53:31 +0800 Subject: [PATCH 1/5] feat: easier configurable namespace for llm example --- examples/llm/components/disagg_router.py | 3 ++- examples/llm/components/frontend.py | 4 ++-- examples/llm/components/kv_router.py | 7 ++++--- examples/llm/components/prefill_worker.py | 7 ++++--- examples/llm/components/processor.py | 5 +++-- examples/llm/components/worker.py | 9 +++++---- examples/llm/configs/agg.yaml | 1 - examples/llm/configs/agg_router.yaml | 1 - examples/llm/configs/disagg.yaml | 1 - examples/llm/configs/disagg_router.yaml | 1 - examples/llm/configs/multinode-405b.yaml | 1 - examples/llm/configs/multinode_agg_r1.yaml | 1 - examples/llm/configs/mutinode_disagg_r1.yaml | 1 - examples/llm/utils/ns.py | 4 ++++ lib/llm/src/disagg_router.rs | 3 ++- 15 files changed, 26 insertions(+), 23 deletions(-) create mode 100644 examples/llm/utils/ns.py diff --git a/examples/llm/components/disagg_router.py b/examples/llm/components/disagg_router.py index 714bdc76ca..3fa21e5fe8 100644 --- a/examples/llm/components/disagg_router.py +++ b/examples/llm/components/disagg_router.py @@ -17,6 +17,7 @@ from dynamo.runtime import EtcdKvCache from dynamo.sdk import dynamo_context +from utils.ns import get_namespace logger = logging.getLogger(__name__) @@ -38,7 +39,7 @@ async def async_init(self): runtime = dynamo_context["runtime"] self.etcd_kv_cache = await EtcdKvCache.create( runtime.etcd_client(), - "/dynamo/disagg_router/", + f"/{get_namespace()}/disagg_router/", { "max_local_prefill_length": str(self.max_local_prefill_length), "max_prefill_queue_size": str(self.max_prefill_queue_size), diff --git a/examples/llm/components/frontend.py b/examples/llm/components/frontend.py index 68bfe58707..c3b3474d14 100644 --- a/examples/llm/components/frontend.py +++ b/examples/llm/components/frontend.py @@ -26,6 +26,7 @@ from dynamo.sdk import async_on_shutdown, depends, service from dynamo.sdk.lib.config import ServiceConfig from dynamo.sdk.lib.image import DYNAMO_IMAGE +from utils.ns import get_namespace logger = logging.getLogger(__name__) @@ -44,7 +45,6 @@ class FrontendConfig(BaseModel): """Configuration for the Frontend service including model and HTTP server settings.""" served_model_name: str - endpoint: str port: int = 8080 @@ -92,7 +92,7 @@ def setup_model(self): "add", "chat-models", self.frontend_config.served_model_name, - self.frontend_config.endpoint, + f"{get_namespace()}.Processor.chat/completions", ], check=False, ) diff --git a/examples/llm/components/kv_router.py b/examples/llm/components/kv_router.py index 9e54fa0d0d..bd70eaf508 100644 --- a/examples/llm/components/kv_router.py +++ b/examples/llm/components/kv_router.py @@ -24,6 +24,7 @@ from utils.logging import check_required_workers from utils.protocol import Tokens from vllm.logger import logger as vllm_logger +from utils.ns import get_namespace from dynamo.llm import AggregatedMetrics, KvIndexer, KvMetricsAggregator, OverlapScores from dynamo.sdk import async_on_start, depends, dynamo_context, dynamo_endpoint, service @@ -70,7 +71,7 @@ def parse_args(service_name, prefix) -> Namespace: @service( dynamo={ "enabled": True, - "namespace": "dynamo", + "namespace": get_namespace(), }, resources={"cpu": "10", "memory": "20Gi"}, workers=1, @@ -96,7 +97,7 @@ def __init__(self): async def async_init(self): self.runtime = dynamo_context["runtime"] self.workers_client = ( - await self.runtime.namespace("dynamo") + await self.runtime.namespace(get_namespace()) .component("VllmWorker") .endpoint("generate") .client() @@ -104,7 +105,7 @@ async def async_init(self): await check_required_workers(self.workers_client, self.args.min_workers) - kv_listener = self.runtime.namespace("dynamo").component("VllmWorker") + kv_listener = self.runtime.namespace(get_namespace()).component("VllmWorker") await kv_listener.create_service() self.indexer = KvIndexer(kv_listener, self.args.block_size) self.metrics_aggregator = KvMetricsAggregator(kv_listener) diff --git a/examples/llm/components/prefill_worker.py b/examples/llm/components/prefill_worker.py index 985ea4b47e..807ea0160d 100644 --- a/examples/llm/components/prefill_worker.py +++ b/examples/llm/components/prefill_worker.py @@ -31,6 +31,7 @@ from dynamo.sdk import async_on_start, dynamo_context, dynamo_endpoint, service from dynamo.sdk.lib.service import LeaseConfig +from utils.ns import get_namespace logger = logging.getLogger(__name__) @@ -42,7 +43,7 @@ class RequestType(BaseModel): @service( dynamo={ "enabled": True, - "namespace": "dynamo", + "namespace": get_namespace(), "custom_lease": LeaseConfig(ttl=1), # 1 second }, resources={"gpu": 1, "cpu": "10", "memory": "20Gi"}, @@ -87,7 +88,7 @@ async def async_init(self): raise RuntimeError("Failed to initialize engine client") runtime = dynamo_context["runtime"] metadata = self.engine_client.nixl_metadata - self._metadata_store = NixlMetadataStore("dynamo", runtime) + self._metadata_store = NixlMetadataStore(get_namespace(), runtime) await self._metadata_store.put(metadata.engine_id, metadata) self.task = asyncio.create_task(self.prefill_queue_handler()) @@ -118,7 +119,7 @@ def shutdown_vllm_engine(self): async def prefill_queue_handler(self): logger.info("Prefill queue handler entered") prefill_queue_nats_server = os.getenv("NATS_SERVER", "nats://localhost:4222") - prefill_queue_stream_name = ( + prefill_queue_stream_name = get_namespace() + '_' + ( self.engine_args.served_model_name if self.engine_args.served_model_name is not None else "vllm" diff --git a/examples/llm/components/processor.py b/examples/llm/components/processor.py index 8b146105ef..83d2e1c543 100644 --- a/examples/llm/components/processor.py +++ b/examples/llm/components/processor.py @@ -29,6 +29,7 @@ from vllm.entrypoints.openai.protocol import ChatCompletionRequest, CompletionRequest from vllm.outputs import RequestOutput from vllm.transformers_utils.tokenizer import AnyTokenizer +from utils.ns import get_namespace from dynamo.llm import KvMetricsAggregator from dynamo.runtime import EtcdKvCache @@ -45,7 +46,7 @@ class RequestType(Enum): @service( dynamo={ "enabled": True, - "namespace": "dynamo", + "namespace": get_namespace(), }, resources={"cpu": "10", "memory": "20Gi"}, workers=1, @@ -112,7 +113,7 @@ async def async_init(self): self.etcd_kv_cache = await EtcdKvCache.create( runtime.etcd_client(), - "/dynamo/processor/", + f"/{get_namespace()}/processor/", {"router": self.engine_args.router}, ) diff --git a/examples/llm/components/worker.py b/examples/llm/components/worker.py index f65e17a865..14122fec7b 100644 --- a/examples/llm/components/worker.py +++ b/examples/llm/components/worker.py @@ -34,6 +34,7 @@ from dynamo.llm import KvMetricsPublisher from dynamo.sdk import async_on_start, depends, dynamo_context, dynamo_endpoint, service from dynamo.sdk.lib.service import LeaseConfig +from utils.ns import get_namespace logger = logging.getLogger(__name__) @@ -41,7 +42,7 @@ @service( dynamo={ "enabled": True, - "namespace": "dynamo", + "namespace": get_namespace(), "custom_lease": LeaseConfig(ttl=1), # 1 second }, resources={"gpu": 1, "cpu": "10", "memory": "20Gi"}, @@ -64,7 +65,7 @@ def __init__(self): self._prefill_queue_nats_server = os.getenv( "NATS_SERVER", "nats://localhost:4222" ) - self._prefill_queue_stream_name = self.model_name + self._prefill_queue_stream_name = get_namespace() + '_' + self.model_name logger.info( f"Prefill queue: {self._prefill_queue_nats_server}:{self._prefill_queue_stream_name}" ) @@ -90,7 +91,7 @@ def __init__(self): self.engine_args.enable_prefix_caching = True os.environ["VLLM_WORKER_ID"] = str(dynamo_context.get("lease").id()) - os.environ["VLLM_KV_NAMESPACE"] = "dynamo" + os.environ["VLLM_KV_NAMESPACE"] = get_namespace() os.environ["VLLM_KV_COMPONENT"] = class_name self.metrics_publisher = KvMetricsPublisher() @@ -128,7 +129,7 @@ async def async_init(self): if self.engine_args.remote_prefill: metadata = self.engine_client.nixl_metadata - metadata_store = NixlMetadataStore("dynamo", runtime) + metadata_store = NixlMetadataStore(get_namespace(), runtime) await metadata_store.put(metadata.engine_id, metadata) if self.engine_args.conditional_disagg: diff --git a/examples/llm/configs/agg.yaml b/examples/llm/configs/agg.yaml index 2ba292d17e..ea0b9aa032 100644 --- a/examples/llm/configs/agg.yaml +++ b/examples/llm/configs/agg.yaml @@ -19,7 +19,6 @@ Common: Frontend: served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B - endpoint: dynamo.Processor.chat/completions port: 8000 Processor: diff --git a/examples/llm/configs/agg_router.yaml b/examples/llm/configs/agg_router.yaml index 9369553015..5ddcbd118b 100644 --- a/examples/llm/configs/agg_router.yaml +++ b/examples/llm/configs/agg_router.yaml @@ -21,7 +21,6 @@ Common: Frontend: served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B - endpoint: dynamo.Processor.chat/completions port: 8000 Processor: diff --git a/examples/llm/configs/disagg.yaml b/examples/llm/configs/disagg.yaml index 6999caed10..31202b6daa 100644 --- a/examples/llm/configs/disagg.yaml +++ b/examples/llm/configs/disagg.yaml @@ -20,7 +20,6 @@ Common: Frontend: served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B - endpoint: dynamo.Processor.chat/completions port: 8000 Processor: diff --git a/examples/llm/configs/disagg_router.yaml b/examples/llm/configs/disagg_router.yaml index 4902011b5b..c128eb158c 100644 --- a/examples/llm/configs/disagg_router.yaml +++ b/examples/llm/configs/disagg_router.yaml @@ -21,7 +21,6 @@ Common: Frontend: served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B - endpoint: dynamo.Processor.chat/completions port: 8000 Processor: diff --git a/examples/llm/configs/multinode-405b.yaml b/examples/llm/configs/multinode-405b.yaml index 0ef9628a29..fae49b47ae 100644 --- a/examples/llm/configs/multinode-405b.yaml +++ b/examples/llm/configs/multinode-405b.yaml @@ -18,7 +18,6 @@ Frontend: served_model_name: nvidia/Llama-3.1-405B-Instruct-FP8 - endpoint: dynamo.Processor.chat/completions port: 8000 Processor: diff --git a/examples/llm/configs/multinode_agg_r1.yaml b/examples/llm/configs/multinode_agg_r1.yaml index 2872316e4a..9caf709ac7 100644 --- a/examples/llm/configs/multinode_agg_r1.yaml +++ b/examples/llm/configs/multinode_agg_r1.yaml @@ -19,7 +19,6 @@ Common: Frontend: served_model_name: deepseek-ai/DeepSeek-R1 - endpoint: dynamo.Processor.chat/completions port: 8000 Processor: diff --git a/examples/llm/configs/mutinode_disagg_r1.yaml b/examples/llm/configs/mutinode_disagg_r1.yaml index e8d0291f0c..9f1e67265d 100644 --- a/examples/llm/configs/mutinode_disagg_r1.yaml +++ b/examples/llm/configs/mutinode_disagg_r1.yaml @@ -21,7 +21,6 @@ Common: Frontend: served_model_name: deepseek-ai/DeepSeek-R1 - endpoint: dynamo.Processor.chat/completions port: 8000 Processor: diff --git a/examples/llm/utils/ns.py b/examples/llm/utils/ns.py new file mode 100644 index 0000000000..4d7677c7f1 --- /dev/null +++ b/examples/llm/utils/ns.py @@ -0,0 +1,4 @@ +import os + +def get_namespace(): + return os.getenv("DYN_NAMESPACE", "dynamo") \ No newline at end of file diff --git a/lib/llm/src/disagg_router.rs b/lib/llm/src/disagg_router.rs index 17339562f5..9808d389a5 100644 --- a/lib/llm/src/disagg_router.rs +++ b/lib/llm/src/disagg_router.rs @@ -39,7 +39,8 @@ impl DisaggRouterConf { drt: Arc, model_name: &str, ) -> anyhow::Result<(Self, watch::Receiver)> { - let etcd_key = format!("public/components/disagg_router/models/chat/{}", model_name); + let namespace = std::env::var("DYN_NAMESPACE").unwrap_or_else(|_| "dynamo".to_string()); + let etcd_key = format!("{}/public/components/disagg_router/models/chat/{}", namespace, model_name); // Get the initial value if it exists let Some(etcd_client) = drt.etcd_client() else { From 23765d2518f89dedfa481ea7e9f3f10d9fd0c4f7 Mon Sep 17 00:00:00 2001 From: guokairui Date: Mon, 28 Apr 2025 17:19:39 +0800 Subject: [PATCH 2/5] fix lint --- examples/llm/components/disagg_router.py | 3 ++- examples/llm/components/frontend.py | 2 +- examples/llm/components/kv_router.py | 2 +- examples/llm/components/prefill_worker.py | 14 +++++++++----- examples/llm/components/processor.py | 2 +- examples/llm/components/worker.py | 4 ++-- examples/llm/utils/ns.py | 9 +++++---- 7 files changed, 21 insertions(+), 15 deletions(-) diff --git a/examples/llm/components/disagg_router.py b/examples/llm/components/disagg_router.py index 3fa21e5fe8..c7dbaadeb1 100644 --- a/examples/llm/components/disagg_router.py +++ b/examples/llm/components/disagg_router.py @@ -15,9 +15,10 @@ import logging +from utils.ns import get_namespace + from dynamo.runtime import EtcdKvCache from dynamo.sdk import dynamo_context -from utils.ns import get_namespace logger = logging.getLogger(__name__) diff --git a/examples/llm/components/frontend.py b/examples/llm/components/frontend.py index c3b3474d14..8346a7d05b 100644 --- a/examples/llm/components/frontend.py +++ b/examples/llm/components/frontend.py @@ -21,12 +21,12 @@ from components.worker import VllmWorker from fastapi import FastAPI from pydantic import BaseModel +from utils.ns import get_namespace from dynamo import sdk from dynamo.sdk import async_on_shutdown, depends, service from dynamo.sdk.lib.config import ServiceConfig from dynamo.sdk.lib.image import DYNAMO_IMAGE -from utils.ns import get_namespace logger = logging.getLogger(__name__) diff --git a/examples/llm/components/kv_router.py b/examples/llm/components/kv_router.py index bd70eaf508..f0edd7bbfd 100644 --- a/examples/llm/components/kv_router.py +++ b/examples/llm/components/kv_router.py @@ -22,9 +22,9 @@ from components.worker import VllmWorker from utils.logging import check_required_workers +from utils.ns import get_namespace from utils.protocol import Tokens from vllm.logger import logger as vllm_logger -from utils.ns import get_namespace from dynamo.llm import AggregatedMetrics, KvIndexer, KvMetricsAggregator, OverlapScores from dynamo.sdk import async_on_start, depends, dynamo_context, dynamo_endpoint, service diff --git a/examples/llm/components/prefill_worker.py b/examples/llm/components/prefill_worker.py index 807ea0160d..1bf22d164e 100644 --- a/examples/llm/components/prefill_worker.py +++ b/examples/llm/components/prefill_worker.py @@ -21,6 +21,7 @@ from pydantic import BaseModel from utils.nixl import NixlMetadataStore +from utils.ns import get_namespace from utils.prefill_queue import PrefillQueue from utils.vllm import parse_vllm_args from vllm.entrypoints.openai.api_server import ( @@ -31,7 +32,6 @@ from dynamo.sdk import async_on_start, dynamo_context, dynamo_endpoint, service from dynamo.sdk.lib.service import LeaseConfig -from utils.ns import get_namespace logger = logging.getLogger(__name__) @@ -119,10 +119,14 @@ def shutdown_vllm_engine(self): async def prefill_queue_handler(self): logger.info("Prefill queue handler entered") prefill_queue_nats_server = os.getenv("NATS_SERVER", "nats://localhost:4222") - prefill_queue_stream_name = get_namespace() + '_' + ( - self.engine_args.served_model_name - if self.engine_args.served_model_name is not None - else "vllm" + prefill_queue_stream_name = ( + get_namespace() + + "_" + + ( + self.engine_args.served_model_name + if self.engine_args.served_model_name is not None + else "vllm" + ) ) logger.info( f"Prefill queue: {prefill_queue_nats_server}:{prefill_queue_stream_name}" diff --git a/examples/llm/components/processor.py b/examples/llm/components/processor.py index 83d2e1c543..5d4d8bd737 100644 --- a/examples/llm/components/processor.py +++ b/examples/llm/components/processor.py @@ -23,13 +23,13 @@ from transformers import AutoTokenizer from utils.chat_processor import ChatProcessor, CompletionsProcessor, ProcessMixIn from utils.logging import check_required_workers +from utils.ns import get_namespace from utils.protocol import MyRequestOutput, Tokens, vLLMGenerateRequest from utils.vllm import RouterType, parse_vllm_args from vllm.engine.arg_utils import AsyncEngineArgs from vllm.entrypoints.openai.protocol import ChatCompletionRequest, CompletionRequest from vllm.outputs import RequestOutput from vllm.transformers_utils.tokenizer import AnyTokenizer -from utils.ns import get_namespace from dynamo.llm import KvMetricsAggregator from dynamo.runtime import EtcdKvCache diff --git a/examples/llm/components/worker.py b/examples/llm/components/worker.py index 14122fec7b..6939a11b15 100644 --- a/examples/llm/components/worker.py +++ b/examples/llm/components/worker.py @@ -22,6 +22,7 @@ from components.disagg_router import PyDisaggregatedRouter from components.prefill_worker import PrefillWorker from utils.nixl import NixlMetadataStore +from utils.ns import get_namespace from utils.prefill_queue import PrefillQueue from utils.protocol import MyRequestOutput, vLLMGenerateRequest from utils.vllm import RouterType, parse_vllm_args @@ -34,7 +35,6 @@ from dynamo.llm import KvMetricsPublisher from dynamo.sdk import async_on_start, depends, dynamo_context, dynamo_endpoint, service from dynamo.sdk.lib.service import LeaseConfig -from utils.ns import get_namespace logger = logging.getLogger(__name__) @@ -65,7 +65,7 @@ def __init__(self): self._prefill_queue_nats_server = os.getenv( "NATS_SERVER", "nats://localhost:4222" ) - self._prefill_queue_stream_name = get_namespace() + '_' + self.model_name + self._prefill_queue_stream_name = get_namespace() + "_" + self.model_name logger.info( f"Prefill queue: {self._prefill_queue_nats_server}:{self._prefill_queue_stream_name}" ) diff --git a/examples/llm/utils/ns.py b/examples/llm/utils/ns.py index 4d7677c7f1..e37a2d8e28 100644 --- a/examples/llm/utils/ns.py +++ b/examples/llm/utils/ns.py @@ -1,4 +1,5 @@ -import os - -def get_namespace(): - return os.getenv("DYN_NAMESPACE", "dynamo") \ No newline at end of file +import os + + +def get_namespace(): + return os.getenv("DYN_NAMESPACE", "dynamo") From 2cc5091aaadc551dd9e5476cb4628153a1e5ebc3 Mon Sep 17 00:00:00 2001 From: guokairui Date: Mon, 28 Apr 2025 17:23:01 +0800 Subject: [PATCH 3/5] fix cpright --- examples/llm/utils/ns.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/examples/llm/utils/ns.py b/examples/llm/utils/ns.py index e37a2d8e28..d65032321e 100644 --- a/examples/llm/utils/ns.py +++ b/examples/llm/utils/ns.py @@ -1,3 +1,19 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + import os From 28b05eb3efcf8c89c5b987875ac4c777192a19dc Mon Sep 17 00:00:00 2001 From: guokairui Date: Wed, 30 Apr 2025 11:42:54 +0800 Subject: [PATCH 4/5] frontend ns --- examples/llm/components/frontend.py | 16 ++++++++++++++-- lib/llm/src/disagg_router.rs | 3 +-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/examples/llm/components/frontend.py b/examples/llm/components/frontend.py index 8346a7d05b..f234fb0de2 100644 --- a/examples/llm/components/frontend.py +++ b/examples/llm/components/frontend.py @@ -52,7 +52,7 @@ class FrontendConfig(BaseModel): @service( dynamo={ "enabled": True, - "namespace": "dynamo", + "namespace": get_namespace(), }, resources={"cpu": "10", "memory": "20Gi"}, workers=1, @@ -78,6 +78,8 @@ def setup_model(self): subprocess.run( [ "llmctl", + "-n", + get_namespace(), "http", "remove", "chat-models", @@ -88,6 +90,8 @@ def setup_model(self): subprocess.run( [ "llmctl", + "-n", + get_namespace(), "http", "add", "chat-models", @@ -103,7 +107,13 @@ def start_http_server(self): http_binary = get_http_binary_path() self.process = subprocess.Popen( - [http_binary, "-p", str(self.frontend_config.port)], + [ + http_binary, + "-p", + str(self.frontend_config.port), + "--namespace", + get_namespace(), + ], stdout=None, stderr=None, ) @@ -116,6 +126,8 @@ def cleanup(self): subprocess.run( [ "llmctl", + "-n", + get_namespace(), "http", "remove", "chat-models", diff --git a/lib/llm/src/disagg_router.rs b/lib/llm/src/disagg_router.rs index 9808d389a5..17339562f5 100644 --- a/lib/llm/src/disagg_router.rs +++ b/lib/llm/src/disagg_router.rs @@ -39,8 +39,7 @@ impl DisaggRouterConf { drt: Arc, model_name: &str, ) -> anyhow::Result<(Self, watch::Receiver)> { - let namespace = std::env::var("DYN_NAMESPACE").unwrap_or_else(|_| "dynamo".to_string()); - let etcd_key = format!("{}/public/components/disagg_router/models/chat/{}", namespace, model_name); + let etcd_key = format!("public/components/disagg_router/models/chat/{}", model_name); // Get the initial value if it exists let Some(etcd_client) = drt.etcd_client() else { From 6afdd57a6049eb650aad5d19bc05384c9f538ec9 Mon Sep 17 00:00:00 2001 From: guokairui Date: Wed, 30 Apr 2025 11:44:35 +0800 Subject: [PATCH 5/5] processor ns --- examples/llm/components/processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/llm/components/processor.py b/examples/llm/components/processor.py index 5d4d8bd737..daa822ef9e 100644 --- a/examples/llm/components/processor.py +++ b/examples/llm/components/processor.py @@ -107,7 +107,7 @@ async def async_init(self): await check_required_workers(self.worker_client, self.min_workers) - kv_listener = runtime.namespace("dynamo").component("VllmWorker") + kv_listener = runtime.namespace(get_namespace()).component("VllmWorker") await kv_listener.create_service() self.metrics_aggregator = KvMetricsAggregator(kv_listener)