Skip to content

Commit 4347dfc

Browse files
fix(chore): Addressed more review comments
Signed-off-by: Christian Pinto <[email protected]>
1 parent 57a1d64 commit 4347dfc

File tree

3 files changed

+81
-52
lines changed

3 files changed

+81
-52
lines changed

plugins/actuators/vllm_performance/ado_actuators/vllm_performance/deployment_management.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,15 @@ async def wait(self, request_id: str, k8s_name: str, model: str) -> None:
4646
)
4747
)
4848
await waiter.model_downloaded_event.wait()
49-
# If after we got awaken the model is not among the downloaded models, it means that
49+
# If after we got awoken the model is not among the downloaded models, it means that
5050
# something has gone wrong, such as the deployment we were waiting for has failed.
5151
# If am the first to wake up let me add myself as the deployment to be waited for and stop waiting.
5252
if (
5353
model not in self.model_already_downloaded
5454
and not self.maybe_add_deployment(k8s_name=k8s_name, model=model)
5555
):
5656
# If I am not the first to wake up, I get the new waiter object and continue waiting
57-
waiter = self.deployments_to_wait_for.get(model, None)
57+
waiter = self.deployments_to_wait_for.get(model)
5858
continue
5959

6060
console.put.remote(
@@ -67,11 +67,14 @@ async def wait(self, request_id: str, k8s_name: str, model: str) -> None:
6767
break
6868

6969
def signal(self, k8s_name: str, model: str, error: bool = False) -> None:
70-
if model in self.deployments_to_wait_for:
71-
waiter = self.deployments_to_wait_for.pop(model)
72-
assert (
73-
waiter.k8s_name == k8s_name
74-
), f"This environment deployment ({k8s_name}) shouldn't have been created because it is conflicting with deployment {waiter.k8s_name}"
75-
if not error:
76-
self.model_already_downloaded.add(model)
77-
waiter.model_downloaded_event.set()
70+
if model not in self.deployments_to_wait_for:
71+
return
72+
73+
waiter = self.deployments_to_wait_for.pop(model)
74+
if waiter.k8s_name != k8s_name:
75+
raise ValueError(
76+
f"This environment deployment ({k8s_name}) shouldn't have been created because it is conflicting with deployment {waiter.k8s_name}"
77+
)
78+
if not error:
79+
self.model_already_downloaded.add(model)
80+
waiter.model_downloaded_event.set()

plugins/actuators/vllm_performance/ado_actuators/vllm_performance/env_manager.py

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33

44
import asyncio
55
import logging
6-
import time
76
from enum import Enum
87

98
import ray
109
from ado_actuators.vllm_performance.deployment_management import (
1110
DeploymentConflictManager,
1211
)
12+
from ado_actuators.vllm_performance.k8s import K8sEnvironmentCreationError
1313
from ado_actuators.vllm_performance.k8s.manage_components import (
1414
ComponentsManager,
1515
)
@@ -105,18 +105,18 @@ def __init__(
105105
pvc_template=pvc_template,
106106
)
107107

108-
def _wipe_deployment(self, identifier: str) -> None:
108+
def _delete_environment_k8s_resources(self, k8s_name: str) -> None:
109109
"""
110110
Deletes a deployment. Intended to be used for cleanup or error recovery
111111
param: identifier: the deployment identifier
112112
"""
113113
try:
114-
self.manager.delete_service(k8s_name=identifier)
114+
self.manager.delete_service(k8s_name=k8s_name)
115115
except ApiException as e:
116116
if e.reason != "Not Found":
117117
raise e
118118
try:
119-
self.manager.delete_deployment(k8s_name=identifier)
119+
self.manager.delete_deployment(k8s_name=k8s_name)
120120
except ApiException as e:
121121
if e.reason != "Not Found":
122122
raise e
@@ -138,9 +138,6 @@ def get_environment(self, model: str, definition: str) -> Environment | None:
138138
:param increment_usage: increment usage flag
139139
:return: environment state
140140
"""
141-
print(
142-
f"getting environment for model {model}, currently {self.active_environments} deployments"
143-
)
144141

145142
# check if there's an existing free environment satisfying the request
146143
env = self.get_matching_free_environment(definition)
@@ -153,24 +150,54 @@ def get_environment(self, model: str, definition: str) -> Environment | None:
153150
f"There are already {self.max_concurrent} actively in use, and I can't create a new one"
154151
)
155152
return None
156-
# There are unused environments, let's evict one
157-
158-
# Gets the oldest env in the list
159-
environment_to_evict = self.free_environments[0]
160-
try:
161-
self.manager.delete_service(k8s_name=environment_to_evict.k8s_name)
162-
self.manager.delete_deployment(
163-
k8s_name=environment_to_evict.k8s_name
164-
)
153+
154+
# There are unused environments, let's try to evict one
155+
environment_evicted = False
156+
eviction_index = 0
157+
# Continue looping until we find one environment that can be successfully evicted or we have gone through them all
158+
while not environment_evicted and eviction_index < len(
159+
self.free_environments
160+
):
161+
environment_to_evict = self.free_environments[eviction_index]
162+
try:
163+
# _delete_environment_k8s_resources will not raise an error if for whatever the reason the service
164+
# or the deployment we are trying to delete does not exist anymore, and we assume
165+
# the deployment was properly deleted.
166+
self._delete_environment_k8s_resources(
167+
k8s_name=environment_to_evict.k8s_name
168+
)
169+
except ApiException as e:
170+
# If we can't delete this environment we try with the next one, but we do not
171+
# delete the current env from the free list. This is to avoid spawning more pods than the maximum configured
172+
# in the case the failing ones are still running.
173+
# Since the current eviction candidate environment will stay in the free ones, some other measurement might
174+
# try to evict again and perhaps succeed (e.g., connection restored to the cluster).
175+
logger.critical(
176+
f"Error deleting deployment or service {environment_to_evict.k8s_name}: {e}"
177+
)
178+
eviction_index += 1
179+
continue
180+
165181
logger.info(
166182
f"deleted environment {environment_to_evict.k8s_name}. "
167183
f"Active environments {self.active_environments}"
168184
)
169-
except ApiException as e:
170-
logger.error(f"Error deleting deployment or service {e}")
171-
# If all the Kubernetes resources got deleted, let's remove this environment from our records
172-
self.free_environments.pop(0)
173-
time.sleep(3)
185+
environment_evicted = True
186+
187+
if environment_evicted:
188+
# successfully deleted an environment
189+
self.free_environments.pop(eviction_index)
190+
elif len(self.in_use_environments) > 0:
191+
# all the free ones have failed deleting but there is one or more in use that
192+
# might make room for waiting measurements. In this case we just behave as if there
193+
# are no free available environments and we wait.
194+
return None
195+
else:
196+
# None of the free environments could be evicted due to errors and none are in use
197+
# To avoid a deadlock of the operation we fail the measurement
198+
raise K8sEnvironmentCreationError(
199+
"All free environments failed deleting and none are currently in use."
200+
)
174201

175202
# We either made space or we had enough space already
176203
env = Environment(model=model, configuration=definition)
@@ -211,7 +238,7 @@ def done_creating(self, identifier: str) -> None:
211238

212239
def cleanup_failed_deployment(self, identifier: str) -> None:
213240
env = self.in_use_environments[identifier]
214-
self._wipe_deployment(identifier=identifier)
241+
self._delete_environment_k8s_resources(k8s_name=identifier)
215242
self.done_using(identifier=identifier, reclaim_on_completion=True)
216243
self.deployment_conflict_manager.signal(
217244
k8s_name=identifier, model=env.model, error=True
@@ -259,7 +286,7 @@ def cleanup(self) -> None:
259286
logger.info("Cleaning environments")
260287
all_envs = list(self.in_use_environments.values()) + self.free_environments
261288
for env in all_envs:
262-
self._wipe_deployment(identifier=env.k8s_name)
289+
self._delete_environment_k8s_resources(k8s_name=env.k8s_name)
263290

264291
# We only delete the PVC if it was created by this actuator
265292
if self.manager.pvc_created:

plugins/actuators/vllm_performance/ado_actuators/vllm_performance/experiment_executor.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
EnvironmentManager,
1717
EnvironmentState,
1818
)
19+
from ado_actuators.vllm_performance.k8s import (
20+
K8sConnectionError,
21+
K8sEnvironmentCreationError,
22+
)
1923
from ado_actuators.vllm_performance.k8s.create_environment import (
2024
create_test_environment,
2125
)
@@ -41,14 +45,6 @@
4145
logger = logging.getLogger(__name__)
4246

4347

44-
class K8EnvironmentCreationError(Exception):
45-
"""Error raised when K8 environment cannot be created for some reason"""
46-
47-
48-
class K8ConnectionError(Exception):
49-
"""Error raised when there is an issue connecting to K8s or a service its hosting"""
50-
51-
5248
def _build_entity_env(values: dict[str, str]) -> str:
5349
"""
5450
This is the list of entity parameters that define the environment:
@@ -109,7 +105,7 @@ def _create_environment(
109105
:param timeout: timeout
110106
:return: kubernetes environment name
111107
112-
:raises K8EnvironmentCreationError if there was an issue
108+
:raises K8sEnvironmentCreationError if there was an issue
113109
- If the creation step fails after three attempts
114110
- If after creation the environment was not in ready state after timeout seconds (1200 default)
115111
@@ -135,9 +131,12 @@ def _create_environment(
135131
)
136132
while True:
137133

138-
env: Environment = ray.get(
139-
env_manager.get_environment.remote(model=model, definition=definition)
140-
)
134+
try:
135+
env: Environment = ray.get(
136+
env_manager.get_environment.remote(model=model, definition=definition)
137+
)
138+
except Exception as e:
139+
raise e
141140
if env is not None:
142141
console.put.remote(
143142
message=RichConsoleSpinnerMessage(
@@ -255,7 +254,7 @@ def _create_environment(
255254
)
256255
)
257256

258-
raise K8EnvironmentCreationError(
257+
raise K8sEnvironmentCreationError(
259258
f"Failed to create test environment {env.k8s_name}: {error}"
260259
)
261260

@@ -315,12 +314,12 @@ def _connect_to_vllm_server(
315314
time.sleep(5)
316315
# Check if there is a returncode- if there is it means port-forward exited
317316
if pf.returncode:
318-
raise K8ConnectionError(
317+
raise K8sConnectionError(
319318
f"failed to start port forward to service {k8s_name} - port-forward command exited for unknown reason. Check logs."
320319
)
321320
except Exception as e:
322321
logger.warning(f"failed to start port forward to service {k8s_name} - {e}")
323-
raise K8ConnectionError(
322+
raise K8sConnectionError(
324323
f"failed to start port forward to service {k8s_name} - {e}"
325324
)
326325

@@ -379,7 +378,7 @@ def run_resource_and_workload_experiment(
379378

380379
logger.info(f"Creating K8s environment for {entity.identifier}")
381380

382-
# Will raise an K8EnvironmentCreationError if the environment could not be created
381+
# Will raise an K8sEnvironmentCreationError if the environment could not be created
383382
k8s_name, definition = _create_environment(
384383
values=values,
385384
actuator=actuator_parameters,
@@ -388,7 +387,7 @@ def run_resource_and_workload_experiment(
388387
request_id=request.requestid,
389388
)
390389

391-
# Will raise an K8ConnectionError if a port-forward was required
390+
# Will raise an K8sConnectionError if a port-forward was required
392391
# but could not be created
393392
current_port += 1
394393
base_url, port_forward = _connect_to_vllm_server(
@@ -428,8 +427,8 @@ def run_resource_and_workload_experiment(
428427
)
429428

430429
except (
431-
K8EnvironmentCreationError,
432-
K8ConnectionError,
430+
K8sEnvironmentCreationError,
431+
K8sConnectionError,
433432
VLLMBenchmarkError,
434433
) as error:
435434
logger.error(f"Error running tests for entity {entity.identifier}: {error}")

0 commit comments

Comments
 (0)