Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ pyx_library(
"//src/ray/gcs_rpc_client:global_state_accessor_lib",
"//src/ray/protobuf:serialization_cc_proto",
"//src/ray/pubsub:python_gcs_subscriber",
"//src/ray/raylet_rpc_client:raylet_pxi_client",
"//src/ray/thirdparty/setproctitle",
"//src/ray/util:memory",
"//src/ray/util:raii",
Expand Down
4 changes: 1 addition & 3 deletions ci/env/install-core-prerelease-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,5 @@ set -e

# install all unbounded dependencies in setup.py and any additional test dependencies
# for the min build for ray core
# TODO(scv119) reenable grpcio once https://github.com/grpc/grpc/issues/31885 is fixed.
# TODO(scv119) reenable jsonschema once https://github.com/ray-project/ray/issues/33411 is fixed.
DEPS=(requests protobuf pytest-httpserver==1.1.3)
DEPS=(requests protobuf pytest-httpserver==1.1.3 jsonschema==4.23.0)
python -m pip install -U --pre --upgrade-strategy=eager "${DEPS[@]}"
4 changes: 0 additions & 4 deletions ci/lint/pydoclint-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -655,10 +655,6 @@ python/ray/autoscaler/v2/autoscaler.py
DOC103: Method `Autoscaler.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [config_reader: IConfigReader, event_logger: Optional[AutoscalerEventLogger], gcs_client: GcsClient, metrics_reporter: Optional[AutoscalerMetricsReporter], session_name: str].
--------------------
python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py
DOC001: Method `__init__` Potential formatting errors in docstring. Error message: No specification for "Args": ""
DOC001: Function/method `__init__`: Potential formatting errors in docstring. Error message: No specification for "Args": "" (Note: DOC001 could trigger other unrelated violations under this function/method too. Please fix the docstring formatting first.)
DOC101: Method `KubeRayProvider.__init__`: Docstring contains fewer arguments than in function signature.
DOC103: Method `KubeRayProvider.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [cluster_name: str, k8s_api_client: Optional[IKubernetesHttpApiClient], provider_config: Dict[str, Any]].
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix lint.

DOC101: Method `KubeRayProvider._get_workers_delete_info`: Docstring contains fewer arguments than in function signature.
DOC103: Method `KubeRayProvider._get_workers_delete_info`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [node_set: Set[CloudInstanceId], ray_cluster_spec: Dict[str, Any]].
DOC201: Method `KubeRayProvider._cloud_instance_from_pod` does not have a return section in docstring
Expand Down
1 change: 1 addition & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ include "includes/function_descriptor.pxi"
include "includes/buffer.pxi"
include "includes/common.pxi"
include "includes/gcs_client.pxi"
include "includes/raylet_client.pxi"
include "includes/serialization.pxi"
include "includes/libcoreworker.pxi"
include "includes/global_state_accessor.pxi"
Expand Down
17 changes: 14 additions & 3 deletions python/ray/autoscaler/_private/kuberay/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,12 @@ def get(self, path: str) -> Dict[str, Any]:
pass

@abstractmethod
def patch(self, path: str, payload: List[Dict[str, Any]]) -> Dict[str, Any]:
def patch(
self,
path: str,
payload: List[Dict[str, Any]],
content_type: str = "application/json-patch+json",
) -> Dict[str, Any]:
"""Wrapper for REST PATCH of resource with proper headers."""
pass

Expand Down Expand Up @@ -316,12 +321,18 @@ def get(self, path: str) -> Dict[str, Any]:
result.raise_for_status()
return result.json()

def patch(self, path: str, payload: List[Dict[str, Any]]) -> Dict[str, Any]:
def patch(
self,
path: str,
payload: List[Dict[str, Any]],
content_type: str = "application/json-patch+json",
) -> Dict[str, Any]:
"""Wrapper for REST PATCH of resource with proper headers

Args:
path: The part of the resource path that starts with the resource type.
payload: The JSON patch payload.
content_type: The content type of the merge strategy.

Returns:
The JSON response of the PATCH request.
Expand All @@ -338,7 +349,7 @@ def patch(self, path: str, payload: List[Dict[str, Any]]) -> Dict[str, Any]:
result = requests.patch(
url,
json.dumps(payload),
headers={**headers, "Content-type": "application/json-patch+json"},
headers={**headers, "Content-type": content_type},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make content-type adjustable for different patch strategies.

timeout=KUBERAY_REQUEST_TIMEOUT_S,
verify=verify,
)
Expand Down
1 change: 1 addition & 0 deletions python/ray/autoscaler/v2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ py_test_module_list(
"tests/test_instance_manager.py",
"tests/test_instance_storage.py",
"tests/test_instance_util.py",
"tests/test_ippr_provider.py",
"tests/test_metrics_reporter.py",
"tests/test_node_provider.py",
"tests/test_ray_installer.py",
Expand Down
1 change: 1 addition & 0 deletions python/ray/autoscaler/v2/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def _init_cloud_instance_provider(
self._cloud_instance_provider = KubeRayProvider(
config.get_config("cluster_name"),
provider_config,
gcs_client=self._gcs_client,
)
elif config.provider == Provider.READ_ONLY:
provider_config["gcs_address"] = self._gcs_client.address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import requests

from ray._raylet import GcsClient

# TODO(rickyx): We should eventually remove these imports
# when we deprecate the v1 kuberay node provider.
from ray.autoscaler._private.kuberay.node_provider import (
Expand All @@ -24,6 +26,9 @@
worker_delete_patch,
worker_replica_patch,
)
from ray.autoscaler.v2.instance_manager.cloud_providers.kuberay.ippr_provider import (
KubeRayIPPRProvider,
)
from ray.autoscaler.v2.instance_manager.node_provider import (
CloudInstance,
CloudInstanceId,
Expand All @@ -33,7 +38,7 @@
NodeKind,
TerminateNodeError,
)
from ray.autoscaler.v2.schema import NodeType
from ray.autoscaler.v2.schema import IPPRSpecs, IPPRStatus, NodeType

logger = logging.getLogger(__name__)

Expand All @@ -51,14 +56,19 @@ def __init__(
self,
cluster_name: str,
provider_config: Dict[str, Any],
gcs_client: GcsClient,
k8s_api_client: Optional[IKubernetesHttpApiClient] = None,
):
"""
Initializes a new KubeRayProvider.

Args:
cluster_name: The name of the RayCluster resource.
provider_config: The namespace of the RayCluster.
k8s_api_client: The client to the Kubernetes API server.
This could be used to mock the Kubernetes API server for testing.
provider_config: The configuration dictionary
for the RayCluster (e.g., namespace and provider-specific settings).
gcs_client: The client to the GCS server.
k8s_api_client: The client to the Kubernetes
API server. This can be used to mock the Kubernetes API server for testing.
"""
self._cluster_name = cluster_name
self._namespace = provider_config["namespace"]
Expand All @@ -75,6 +85,9 @@ def __init__(
# Below are states that are fetched from the Kubernetes API server.
self._ray_cluster = None
self._cached_instances: Dict[CloudInstanceId, CloudInstance]
self._ippr_provider = KubeRayIPPRProvider(
gcs_client=gcs_client, k8s_api_client=self._k8s_api_client
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KubeRayIPPRProvider needs a gcs_client to query the port and the address of a Raylet, and it also needs a k8s_api_client to patch pods.

)

@dataclass
class ScaleRequest:
Expand Down Expand Up @@ -183,6 +196,31 @@ def poll_errors(self) -> List[CloudInstanceProviderError]:
self._terminate_errors_queue = []
return errors

def get_ippr_specs(self) -> IPPRSpecs:
"""Return the cached, validated IPPR specs for the cluster.

The IPPR specs are refreshed during the provider's periodic sync with the
API server by reading the RayCluster annotation and validating it against
the IPPR schema.
"""
return self._ippr_provider.get_ippr_specs()

def get_ippr_statuses(self) -> Dict[str, IPPRStatus]:
"""Return the latest per-pod IPPR statuses keyed by pod name.

These statuses are refreshed from the current pod list during the provider's
periodic sync with the API server.
"""
return self._ippr_provider.get_ippr_statuses()

def do_ippr_requests(self, resizes: List[IPPRStatus]) -> None:
"""Execute IPPR resize requests via the underlying IPPR provider.

Args:
resizes: The list of per-pod IPPR actions produced by the scheduler.
"""
self._ippr_provider.do_ippr_requests(resizes)

############################
# Private
############################
Expand Down Expand Up @@ -416,7 +454,9 @@ def _add_terminate_errors(
def _sync_with_api_server(self) -> None:
"""Fetches the RayCluster resource from the Kubernetes API server."""
self._ray_cluster = self._get(f"rayclusters/{self._cluster_name}")
self._ippr_provider.validate_and_set_ippr_specs(self._ray_cluster)
self._cached_instances = self._fetch_instances()
self._ippr_provider.sync_with_raylets()

@property
def ray_cluster(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -522,6 +562,9 @@ def _fetch_instances(self) -> Dict[CloudInstanceId, CloudInstance]:
cloud_instance = self._cloud_instance_from_pod(pod)
if cloud_instance:
cloud_instances[pod_name] = cloud_instance

self._ippr_provider.sync_ippr_status_from_pods(pod_list["items"])

return cloud_instances

@staticmethod
Expand Down
Loading