Skip to content

Commit def88f1

Browse files
committed
[core][autoscaler][IPPR] Initial impl for resizing Pods in-place to the maximum configured by the user
Signed-off-by: Rueian <[email protected]>
1 parent d6d50a3 commit def88f1

File tree

13 files changed

+2153
-18
lines changed

13 files changed

+2153
-18
lines changed

ci/env/install-core-prerelease-dependencies.sh

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,5 @@ set -e
44

55
# install all unbounded dependencies in setup.py and any additional test dependencies
66
# for the min build for ray core
7-
# TODO(scv119) reenable grpcio once https://github.com/grpc/grpc/issues/31885 is fixed.
8-
# TODO(scv119) reenable jsonschema once https://github.com/ray-project/ray/issues/33411 is fixed.
9-
DEPS=(requests protobuf pytest-httpserver==1.1.3)
7+
DEPS=(requests protobuf pytest-httpserver==1.1.3 grpcio==1.74.0 jsonschema==4.23.0)
108
python -m pip install -U --pre --upgrade-strategy=eager "${DEPS[@]}"

ci/lint/pydoclint-baseline.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -655,10 +655,6 @@ python/ray/autoscaler/v2/autoscaler.py
655655
DOC103: Method `Autoscaler.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [config_reader: IConfigReader, event_logger: Optional[AutoscalerEventLogger], gcs_client: GcsClient, metrics_reporter: Optional[AutoscalerMetricsReporter], session_name: str].
656656
--------------------
657657
python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py
658-
DOC001: Method `__init__` Potential formatting errors in docstring. Error message: No specification for "Args": ""
659-
DOC001: Function/method `__init__`: Potential formatting errors in docstring. Error message: No specification for "Args": "" (Note: DOC001 could trigger other unrelated violations under this function/method too. Please fix the docstring formatting first.)
660-
DOC101: Method `KubeRayProvider.__init__`: Docstring contains fewer arguments than in function signature.
661-
DOC103: Method `KubeRayProvider.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [cluster_name: str, k8s_api_client: Optional[IKubernetesHttpApiClient], provider_config: Dict[str, Any]].
662658
DOC101: Method `KubeRayProvider._get_workers_delete_info`: Docstring contains fewer arguments than in function signature.
663659
DOC103: Method `KubeRayProvider._get_workers_delete_info`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [node_set: Set[CloudInstanceId], ray_cluster_spec: Dict[str, Any]].
664660
DOC201: Method `KubeRayProvider._cloud_instance_from_pod` does not have a return section in docstring

python/ray/autoscaler/_private/kuberay/node_provider.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,12 @@ def get(self, path: str) -> Dict[str, Any]:
264264
pass
265265

266266
@abstractmethod
267-
def patch(self, path: str, payload: List[Dict[str, Any]]) -> Dict[str, Any]:
267+
def patch(
268+
self,
269+
path: str,
270+
payload: List[Dict[str, Any]],
271+
content_type: str = "application/json-patch+json",
272+
) -> Dict[str, Any]:
268273
"""Wrapper for REST PATCH of resource with proper headers."""
269274
pass
270275

@@ -316,12 +321,18 @@ def get(self, path: str) -> Dict[str, Any]:
316321
result.raise_for_status()
317322
return result.json()
318323

319-
def patch(self, path: str, payload: List[Dict[str, Any]]) -> Dict[str, Any]:
324+
def patch(
325+
self,
326+
path: str,
327+
payload: List[Dict[str, Any]],
328+
content_type: str = "application/json-patch+json",
329+
) -> Dict[str, Any]:
320330
"""Wrapper for REST PATCH of resource with proper headers
321331
322332
Args:
323333
path: The part of the resource path that starts with the resource type.
324334
payload: The JSON patch payload.
335+
content_type: The content type of the merge strategy.
325336
326337
Returns:
327338
The JSON response of the PATCH request.
@@ -338,7 +349,7 @@ def patch(self, path: str, payload: List[Dict[str, Any]]) -> Dict[str, Any]:
338349
result = requests.patch(
339350
url,
340351
json.dumps(payload),
341-
headers={**headers, "Content-type": "application/json-patch+json"},
352+
headers={**headers, "Content-type": content_type},
342353
timeout=KUBERAY_REQUEST_TIMEOUT_S,
343354
verify=verify,
344355
)

python/ray/autoscaler/v2/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ py_test_module_list(
2222
"tests/test_instance_manager.py",
2323
"tests/test_instance_storage.py",
2424
"tests/test_instance_util.py",
25+
"tests/test_ippr_provider.py",
2526
"tests/test_metrics_reporter.py",
2627
"tests/test_node_provider.py",
2728
"tests/test_ray_installer.py",

python/ray/autoscaler/v2/autoscaler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ def _init_cloud_instance_provider(
101101
self._cloud_instance_provider = KubeRayProvider(
102102
config.get_config("cluster_name"),
103103
provider_config,
104+
gcs_client=self._gcs_client,
104105
)
105106
elif config.provider == Provider.READ_ONLY:
106107
provider_config["gcs_address"] = self._gcs_client.address

python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import requests
99

10+
from ray._raylet import GcsClient
11+
1012
# TODO(rickyx): We should eventually remove these imports
1113
# when we deprecate the v1 kuberay node provider.
1214
from ray.autoscaler._private.kuberay.node_provider import (
@@ -24,6 +26,9 @@
2426
worker_delete_patch,
2527
worker_replica_patch,
2628
)
29+
from ray.autoscaler.v2.instance_manager.cloud_providers.kuberay.ippr_provider import (
30+
KubeRayIPPRProvider,
31+
)
2732
from ray.autoscaler.v2.instance_manager.node_provider import (
2833
CloudInstance,
2934
CloudInstanceId,
@@ -33,7 +38,7 @@
3338
NodeKind,
3439
TerminateNodeError,
3540
)
36-
from ray.autoscaler.v2.schema import NodeType
41+
from ray.autoscaler.v2.schema import IPPRSpecs, IPPRStatus, NodeType
3742

3843
logger = logging.getLogger(__name__)
3944

@@ -51,14 +56,19 @@ def __init__(
5156
self,
5257
cluster_name: str,
5358
provider_config: Dict[str, Any],
59+
gcs_client: Optional[GcsClient] = None,
5460
k8s_api_client: Optional[IKubernetesHttpApiClient] = None,
5561
):
5662
"""
63+
Initializes a new KubeRayProvider.
64+
5765
Args:
5866
cluster_name: The name of the RayCluster resource.
59-
provider_config: The namespace of the RayCluster.
60-
k8s_api_client: The client to the Kubernetes API server.
61-
This could be used to mock the Kubernetes API server for testing.
67+
provider_config: The configuration dictionary
68+
for the RayCluster (e.g., namespace and provider-specific settings).
69+
gcs_client: The client to the GCS server.
70+
k8s_api_client: The client to the Kubernetes
71+
API server. This can be used to mock the Kubernetes API server for testing.
6272
"""
6373
self._cluster_name = cluster_name
6474
self._namespace = provider_config["namespace"]
@@ -75,6 +85,9 @@ def __init__(
7585
# Below are states that are fetched from the Kubernetes API server.
7686
self._ray_cluster = None
7787
self._cached_instances: Dict[CloudInstanceId, CloudInstance]
88+
self._ippr_provider = KubeRayIPPRProvider(
89+
gcs_client=gcs_client, k8s_api_client=self._k8s_api_client
90+
)
7891

7992
@dataclass
8093
class ScaleRequest:
@@ -183,6 +196,31 @@ def poll_errors(self) -> List[CloudInstanceProviderError]:
183196
self._terminate_errors_queue = []
184197
return errors
185198

199+
def get_ippr_specs(self) -> IPPRSpecs:
200+
"""Return the cached, validated IPPR specs for the cluster.
201+
202+
The IPPR specs are refreshed during the provider's periodic sync with the
203+
API server by reading the RayCluster annotation and validating it against
204+
the IPPR schema.
205+
"""
206+
return self._ippr_provider.get_ippr_specs()
207+
208+
def get_ippr_statuses(self) -> Dict[str, IPPRStatus]:
209+
"""Return the latest per-pod IPPR statuses keyed by pod name.
210+
211+
These statuses are refreshed from the current pod list during the provider's
212+
periodic sync with the API server.
213+
"""
214+
return self._ippr_provider.get_ippr_statuses()
215+
216+
def do_ippr_requests(self, resizes: List[IPPRStatus]) -> None:
217+
"""Execute IPPR resize requests via the underlying IPPR provider.
218+
219+
Args:
220+
resizes: The list of per-pod IPPR actions produced by the scheduler.
221+
"""
222+
self._ippr_provider.do_ippr_requests(resizes)
223+
186224
############################
187225
# Private
188226
############################
@@ -416,7 +454,9 @@ def _add_terminate_errors(
416454
def _sync_with_api_server(self) -> None:
417455
"""Fetches the RayCluster resource from the Kubernetes API server."""
418456
self._ray_cluster = self._get(f"rayclusters/{self._cluster_name}")
457+
self._ippr_provider.validate_and_set_ippr_specs(self._ray_cluster)
419458
self._cached_instances = self._fetch_instances()
459+
self._ippr_provider.sync_with_raylets()
420460

421461
@property
422462
def ray_cluster(self) -> Dict[str, Any]:
@@ -522,6 +562,9 @@ def _fetch_instances(self) -> Dict[CloudInstanceId, CloudInstance]:
522562
cloud_instance = self._cloud_instance_from_pod(pod)
523563
if cloud_instance:
524564
cloud_instances[pod_name] = cloud_instance
565+
566+
self._ippr_provider.sync_ippr_status_from_pods(pod_list["items"])
567+
525568
return cloud_instances
526569

527570
@staticmethod

0 commit comments

Comments
 (0)