Skip to content

Commit 388ba37

Browse files
committed
[core][autoscaler][IPPR] Initial impl for resizing Pods in-place to the maximum configured by the user
Signed-off-by: Rueian <[email protected]>
1 parent 7c53106 commit 388ba37

File tree

12 files changed

+2151
-15
lines changed

12 files changed

+2151
-15
lines changed

ci/lint/pydoclint-baseline.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -662,10 +662,6 @@ python/ray/autoscaler/v2/autoscaler.py
662662
DOC103: Method `Autoscaler.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [config_reader: IConfigReader, event_logger: Optional[AutoscalerEventLogger], gcs_client: GcsClient, metrics_reporter: Optional[AutoscalerMetricsReporter], session_name: str].
663663
--------------------
664664
python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py
665-
DOC001: Method `__init__` Potential formatting errors in docstring. Error message: No specification for "Args": ""
666-
DOC001: Function/method `__init__`: Potential formatting errors in docstring. Error message: No specification for "Args": "" (Note: DOC001 could trigger other unrelated violations under this function/method too. Please fix the docstring formatting first.)
667-
DOC101: Method `KubeRayProvider.__init__`: Docstring contains fewer arguments than in function signature.
668-
DOC103: Method `KubeRayProvider.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [cluster_name: str, k8s_api_client: Optional[IKubernetesHttpApiClient], provider_config: Dict[str, Any]].
669665
DOC101: Method `KubeRayProvider._get_workers_delete_info`: Docstring contains fewer arguments than in function signature.
670666
DOC103: Method `KubeRayProvider._get_workers_delete_info`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [node_set: Set[CloudInstanceId], ray_cluster_spec: Dict[str, Any]].
671667
DOC201: Method `KubeRayProvider._cloud_instance_from_pod` does not have a return section in docstring

python/ray/autoscaler/_private/kuberay/node_provider.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,12 @@ def get(self, path: str) -> Dict[str, Any]:
259259
pass
260260

261261
@abstractmethod
262-
def patch(self, path: str, payload: List[Dict[str, Any]]) -> Dict[str, Any]:
262+
def patch(
263+
self,
264+
path: str,
265+
payload: List[Dict[str, Any]],
266+
content_type: str = "application/json-patch+json",
267+
) -> Dict[str, Any]:
263268
"""Wrapper for REST PATCH of resource with proper headers."""
264269
pass
265270

@@ -311,12 +316,18 @@ def get(self, path: str) -> Dict[str, Any]:
311316
result.raise_for_status()
312317
return result.json()
313318

314-
def patch(self, path: str, payload: List[Dict[str, Any]]) -> Dict[str, Any]:
319+
def patch(
320+
self,
321+
path: str,
322+
payload: List[Dict[str, Any]],
323+
content_type: str = "application/json-patch+json",
324+
) -> Dict[str, Any]:
315325
"""Wrapper for REST PATCH of resource with proper headers
316326
317327
Args:
318328
path: The part of the resource path that starts with the resource type.
319329
payload: The JSON patch payload.
330+
content_type: The content type of the merge strategy.
320331
321332
Returns:
322333
The JSON response of the PATCH request.
@@ -333,7 +344,7 @@ def patch(self, path: str, payload: List[Dict[str, Any]]) -> Dict[str, Any]:
333344
result = requests.patch(
334345
url,
335346
json.dumps(payload),
336-
headers={**headers, "Content-type": "application/json-patch+json"},
347+
headers={**headers, "Content-type": content_type},
337348
verify=verify,
338349
)
339350
if not result.status_code == 200:

python/ray/autoscaler/v2/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ py_test_module_list(
2222
"tests/test_instance_manager.py",
2323
"tests/test_instance_storage.py",
2424
"tests/test_instance_util.py",
25+
"tests/test_ippr_provider.py",
2526
"tests/test_metrics_reporter.py",
2627
"tests/test_node_provider.py",
2728
"tests/test_ray_installer.py",

python/ray/autoscaler/v2/autoscaler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ def _init_cloud_instance_provider(
101101
self._cloud_instance_provider = KubeRayProvider(
102102
config.get_config("cluster_name"),
103103
provider_config,
104+
gcs_client=self._gcs_client,
104105
)
105106
elif config.provider == Provider.READ_ONLY:
106107
provider_config["gcs_address"] = self._gcs_client.address

python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
worker_delete_patch,
2424
worker_replica_patch,
2525
)
26+
from ray._raylet import GcsClient
27+
from ray.autoscaler.v2.instance_manager.cloud_providers.kuberay.ippr_provider import (
28+
KubeRayIPPRProvider,
29+
)
2630
from ray.autoscaler.v2.instance_manager.node_provider import (
2731
CloudInstance,
2832
CloudInstanceId,
@@ -32,7 +36,7 @@
3236
NodeKind,
3337
TerminateNodeError,
3438
)
35-
from ray.autoscaler.v2.schema import NodeType
39+
from ray.autoscaler.v2.schema import IPPRSpecs, IPPRStatus, NodeType
3640

3741
logger = logging.getLogger(__name__)
3842

@@ -50,14 +54,19 @@ def __init__(
5054
self,
5155
cluster_name: str,
5256
provider_config: Dict[str, Any],
57+
gcs_client: Optional[GcsClient] = None,
5358
k8s_api_client: Optional[IKubernetesHttpApiClient] = None,
5459
):
5560
"""
61+
Initializes a new KubeRayProvider.
62+
5663
Args:
5764
cluster_name: The name of the RayCluster resource.
58-
provider_config: The namespace of the RayCluster.
59-
k8s_api_client: The client to the Kubernetes API server.
60-
This could be used to mock the Kubernetes API server for testing.
65+
provider_config: The configuration dictionary
66+
for the RayCluster (e.g., namespace and provider-specific settings).
67+
gcs_client: The client to the GCS server.
68+
k8s_api_client: The client to the Kubernetes
69+
API server. This can be used to mock the Kubernetes API server for testing.
6170
"""
6271
self._cluster_name = cluster_name
6372
self._namespace = provider_config["namespace"]
@@ -74,6 +83,9 @@ def __init__(
7483
# Below are states that are fetched from the Kubernetes API server.
7584
self._ray_cluster = None
7685
self._cached_instances: Dict[CloudInstanceId, CloudInstance]
86+
self._ippr_provider = KubeRayIPPRProvider(
87+
gcs_client=gcs_client, k8s_api_client=self._k8s_api_client
88+
)
7789

7890
@dataclass
7991
class ScaleRequest:
@@ -182,6 +194,31 @@ def poll_errors(self) -> List[CloudInstanceProviderError]:
182194
self._terminate_errors_queue = []
183195
return errors
184196

197+
def get_ippr_specs(self) -> IPPRSpecs:
198+
"""Return the cached, validated IPPR specs for the cluster.
199+
200+
The IPPR specs are refreshed during the provider's periodic sync with the
201+
API server by reading the RayCluster annotation and validating it against
202+
the IPPR schema.
203+
"""
204+
return self._ippr_provider.get_ippr_specs()
205+
206+
def get_ippr_statuses(self) -> Dict[str, IPPRStatus]:
207+
"""Return the latest per-pod IPPR statuses keyed by pod name.
208+
209+
These statuses are refreshed from the current pod list during the provider's
210+
periodic sync with the API server.
211+
"""
212+
return self._ippr_provider.get_ippr_statuses()
213+
214+
def do_ippr_requests(self, resizes: List[IPPRStatus]) -> None:
215+
"""Execute IPPR resize requests via the underlying IPPR provider.
216+
217+
Args:
218+
resizes: The list of per-pod IPPR actions produced by the scheduler.
219+
"""
220+
self._ippr_provider.do_ippr_requests(resizes)
221+
185222
############################
186223
# Private
187224
############################
@@ -393,7 +430,9 @@ def _add_terminate_errors(
393430
def _sync_with_api_server(self) -> None:
394431
"""Fetches the RayCluster resource from the Kubernetes API server."""
395432
self._ray_cluster = self._get(f"rayclusters/{self._cluster_name}")
433+
self._ippr_provider.validate_and_set_ippr_specs(self._ray_cluster)
396434
self._cached_instances = self._fetch_instances()
435+
self._ippr_provider.sync_with_raylets()
397436

398437
@property
399438
def ray_cluster(self) -> Dict[str, Any]:
@@ -499,6 +538,9 @@ def _fetch_instances(self) -> Dict[CloudInstanceId, CloudInstance]:
499538
cloud_instance = self._cloud_instance_from_pod(pod)
500539
if cloud_instance:
501540
cloud_instances[pod_name] = cloud_instance
541+
542+
self._ippr_provider.sync_ippr_status_from_pods(pod_list["items"])
543+
502544
return cloud_instances
503545

504546
@staticmethod

0 commit comments

Comments
 (0)