diff --git a/dlrover/brain/python/common/constants.py b/dlrover/brain/python/common/constants.py index 4402d4dd6..6e1fee752 100644 --- a/dlrover/brain/python/common/constants.py +++ b/dlrover/brain/python/common/constants.py @@ -19,3 +19,6 @@ class Node(object): class DefaultResource(object): WORKER_CPU = 4 WORKER_MEM = 8 * 1024 * 1024 + +class UnitConvertor(object): + GIB_TO_BYTES = 1024 ** 3 diff --git a/dlrover/brain/python/config/__init__.py b/dlrover/brain/python/config/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dlrover/brain/python/config/manifests/brain-base-configmap.yaml b/dlrover/brain/python/config/manifests/brain-base-configmap.yaml new file mode 100644 index 000000000..8bf815867 --- /dev/null +++ b/dlrover/brain/python/config/manifests/brain-base-configmap.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: brain-base-config + namespace: dlrover +data: + config.json: | + { + "customize_worker_resource": { + "cpu": 16, + "memory": 64, + "gpus": 2, + "gpu_type": "huawei.com/Ascend910B3" + } + } \ No newline at end of file diff --git a/dlrover/brain/python/config/manifests/brain-base-opt-configmap.yaml b/dlrover/brain/python/config/manifests/brain-base-opt-configmap.yaml new file mode 100644 index 000000000..3d284bdc7 --- /dev/null +++ b/dlrover/brain/python/config/manifests/brain-base-opt-configmap.yaml @@ -0,0 +1,25 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: brain-opt-config + namespace: dlrover +data: + config.json: | + { + "opt_config": { + "incremental_memory_factor": 2, + "incremental_gpus_factor": 1, + "reduction_gpus_factor": 1, + "optimize_add_gpus_utilization_threshold": 10, + "optimize_reduce_gpus_utilization_threshold": 5, + "trigger_resource_elastic_time_window": 600, + "min_samples_in_window": 30, + "max_buffer_size": 200, + "vertical_elastic_scale_interval": 180, + "vertical_elastic_opt_interval": 36000, + "vertical_elastic_max_opt_num": 2, + "ckpt_save_status_check_max_wait_time": 120, + "ckpt_save_status_check_interval": 1, + "log_interval_check_nums": 5 + } + } \ No newline at end of file diff --git a/dlrover/brain/python/config/manifests/brain-manual-scale-configmap.yaml b/dlrover/brain/python/config/manifests/brain-manual-scale-configmap.yaml new file mode 100644 index 000000000..821b8e516 --- /dev/null +++ b/dlrover/brain/python/config/manifests/brain-manual-scale-configmap.yaml @@ -0,0 +1,16 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: brain-manual-scale-config + namespace: dlrover +data: + config.json: | + { + "customize_worker_resource": { + "cpu": 16, + "memory": 64, + "gpus": 2, + "gpu_type": "huawei.com/Ascend910B3" + }, + "job_names": "gpu-elastic-manual,test2" + } \ No newline at end of file diff --git a/dlrover/brain/python/config/manifests/brain-service-dev.yaml b/dlrover/brain/python/config/manifests/brain-service-dev.yaml new file mode 100644 index 000000000..61d47987c --- /dev/null +++ b/dlrover/brain/python/config/manifests/brain-service-dev.yaml @@ -0,0 +1,75 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: dlrover-brain + namespace: dlrover +spec: + type: NodePort + ports: + - port: 50002 + protocol: TCP + targetPort: 50002 + nodePort: 30002 + selector: + app: dlrover-brain + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: dlrover-brain + name: dlrover-brain + namespace: dlrover +spec: + replicas: 1 + selector: + matchLabels: + app: dlrover-brain + template: + metadata: + labels: + app: dlrover-brain + name: dlrover-brain + namespace: dlrover + spec: + serviceAccountName: dlrover-controller-manager + volumes: + - name: pvc-nas + persistentVolumeClaim: + claimName: elastic-train-pvc-dlrover + containers: + - command: + - /bin/bash + - -c + - (cd /xxx/DLRover && pip install -e . \ + && python -m dlrover.brain.python.server.server -alsologtostderr \ + --namespace dlrover --port 50002 2>&1) | + tee /data/logs/brain-logs/brain.log; exit ${PIPESTATUS[0]} + env: + - name: POD_IP + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + - name: TZ + value: Asia/Shanghai + - name: PYTHONPATH + value: $PYTHONPATH:$(pwd) + image: registry.cic.cmbchina.cn/cic-arm/cmb-neotrain-dlrover-brain:master-v0.7.0-hw-20260212 + imagePullPolicy: IfNotPresent + name: dlrover-brain + ports: + - containerPort: 50002 + protocol: TCP + volumeMounts: + - name: pvc-nas + mountPath: /data + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi \ No newline at end of file diff --git a/dlrover/brain/python/config/manifests/brain-service.yaml b/dlrover/brain/python/config/manifests/brain-service.yaml new file mode 100644 index 000000000..20e36e3c6 --- /dev/null +++ b/dlrover/brain/python/config/manifests/brain-service.yaml @@ -0,0 +1,73 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: dlrover-brain + namespace: dlrover +spec: + type: NodePort + ports: + - port: 50002 + protocol: TCP + targetPort: 50002 + selector: + app: dlrover-brain + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: dlrover-brain + name: dlrover-brain + namespace: dlrover +spec: + replicas: 1 + selector: + matchLabels: + app: dlrover-brain + template: + metadata: + labels: + app: dlrover-brain + name: dlrover-brain + namespace: dlrover + spec: + serviceAccountName: dlrover-controller-manager + volumes: + - name: pvc-nas + persistentVolumeClaim: + claimName: elastic-train-pvc-dlrover + containers: + - command: + - /bin/bash + - -c + - (python -m dlrover.brain.python.server.server -alsologtostderr \ + --namespace dlrover --port 50002 2>&1) | + tee /data/logs/brain-logs/brain.log; exit ${PIPESTATUS[0]} + env: + - name: POD_IP + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + - name: TZ + value: Asia/Shanghai + - name: PYTHONPATH + value: $PYTHONPATH:$(pwd) + image: registry.cic.cmbchina.cn/cic-arm/cmb-neotrain-dlrover-brain:master-v0.7.0-hw-20260212 + imagePullPolicy: IfNotPresent + name: dlrover-brain + ports: + - containerPort: 50002 + protocol: TCP + volumeMounts: + - name: pvc-nas + mountPath: /data + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi diff --git a/dlrover/brain/python/optimization/optalgorithm/__init__.py b/dlrover/brain/python/optimization/optalgorithm/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dlrover/brain/python/optimization/optalgorithm/base_optimize_job_resource.py b/dlrover/brain/python/optimization/optalgorithm/base_optimize_job_resource.py new file mode 100644 index 000000000..24b1fc2c3 --- /dev/null +++ b/dlrover/brain/python/optimization/optalgorithm/base_optimize_job_resource.py @@ -0,0 +1,77 @@ + +from dlrover.brain.python.common.job import ( + JobMeta, + NodeResource, + OptimizeConfig, +) +from dlrover.brain.python.common.constants import ( + UnitConvertor, +) +from dlrover.brain.python.platform.k8s.configmap import ConfigMapReader +from dlrover.python.common.log import default_logger as logger +from typing import Optional, Dict, Any + +class BaseOptimizeJobResource: + def __init__(self): + self.current_node_resource = NodeResource() + pass + + @staticmethod + def get_name() -> str: + return "BaseOptimizeJobResource" + + def generate_node_resource(self, job: JobMeta, conf: OptimizeConfig) -> NodeResource: + configmap_reader = ConfigMapReader(job.namespace, "brain-base-config") + json_data =configmap_reader.read_json_Data() + + if not isinstance(json_data, dict): + logger.warning(f"ConfigMap data is not a dictionary, type: {type(json_data)}. Using default empty resource.") + return self.current_node_resource + + customize_worker_resource: Optional[Dict[str, Any]] = json_data.get("customize_worker_resource", None) + + if not customize_worker_resource or not isinstance(customize_worker_resource, dict): + logger.warning("ConfigMap 'customize_worker_resource' is missing or invalid. Using default empty resource.") + return self.current_node_resource + + logger.info(f"ConfigMap customize_worker_resource data: {customize_worker_resource}") + + cpu_val = customize_worker_resource.get("cpu") + memory_val = customize_worker_resource.get("memory") + gpu_nums_val = customize_worker_resource.get("gpus") + gpu_type_val = customize_worker_resource.get("gpu_type") + + required_fields = {"cpu": cpu_val, "memory": memory_val, "gpus": gpu_nums_val, "gpu_type": gpu_type_val} + missing_fields = [k for k, v in required_fields.items() if v is None] + + if missing_fields: + logger.error(f"Missing required fields in customize_worker_resource: {missing_fields}. Using default empty resource.") + return self.current_node_resource + + try: + cpu = int(cpu_val) + memory_gib = int(memory_val) # GiB + gpu_nums = int(gpu_nums_val) + + if cpu < 0 or memory_gib < 0 or gpu_nums < 0: + raise ValueError("Resource values cannot be negative.") + + # unit conversion (GiB -> Bytes) + memory_bytes = memory_gib * UnitConvertor.GIB_TO_BYTES + gpu_type = str(gpu_type_val).strip() + if not gpu_type: + logger.warning("GPU type is empty, defaulting to 'none' or handling as per business logic.") + gpu_type = "none" + + except (ValueError, TypeError) as e: + logger.error(f"Invalid data type in customize_worker_resource: {e}. Using default resource.") + return self.current_node_resource + logger.info( + f"Applying custom resources -> CPU: {cpu}, Memory: {memory_gib}GiB, GPUs: {gpu_nums}, Type: {gpu_type}" + ) + return NodeResource( + cpu=cpu, + memory=memory_bytes, # GiB → B + gpu=gpu_nums, + gpu_type=gpu_type, + ) diff --git a/dlrover/brain/python/optimization/optalgorithm/manual_optimize_job_resource.py b/dlrover/brain/python/optimization/optalgorithm/manual_optimize_job_resource.py new file mode 100644 index 000000000..6833cb716 --- /dev/null +++ b/dlrover/brain/python/optimization/optalgorithm/manual_optimize_job_resource.py @@ -0,0 +1,92 @@ + +from dlrover.brain.python.common.job import ( + JobMeta, + NodeResource, + OptimizeConfig, +) +from dlrover.brain.python.common.constants import ( + UnitConvertor, +) +from dlrover.brain.python.platform.k8s.configmap import ConfigMapReader +from dlrover.brain.python.platform.k8s.monitor import ResourceMonitor +from dlrover.python.common.log import default_logger as logger +from typing import Optional, Dict, Any + +class ManualOptimizeJobResource: + def __init__(self): + self.current_node_resource = NodeResource() + pass + + @staticmethod + def get_name() -> str: + return "ManualOptimizeJobResource" + + def generate_node_resource(self, job: JobMeta, conf: OptimizeConfig) -> NodeResource: + configmap_reader = ConfigMapReader(job.namespace, "brain-manual-scale-config") + json_data =configmap_reader.read_json_Data() + + if not isinstance(json_data, dict): + logger.warning(f"ConfigMap data is not a dictionary, type: {type(json_data)}. Using default empty resource.") + return self.current_node_resource + + customize_worker_resource: Optional[Dict[str, Any]] = json_data.get("customize_worker_resource", None) + + if not customize_worker_resource or not isinstance(customize_worker_resource, dict): + logger.warning("ConfigMap 'customize_worker_resource' is missing or invalid. Using default empty resource.") + return self.current_node_resource + + logger.info(f"ConfigMap customize_worker_resource data: {customize_worker_resource}") + + cpu_val = customize_worker_resource.get("cpu") + memory_val = customize_worker_resource.get("memory") + gpu_nums_val = customize_worker_resource.get("gpus") + gpu_type_val = customize_worker_resource.get("gpu_type") + + required_fields = {"cpu": cpu_val, "memory": memory_val, "gpus": gpu_nums_val, "gpu_type": gpu_type_val} + missing_fields = [k for k, v in required_fields.items() if v is None] + + if missing_fields: + logger.error(f"Missing required fields in customize_worker_resource: {missing_fields}. Using default empty resource.") + return self.current_node_resource + + try: + cluster_idle_gpus = ResourceMonitor(job.namespace, gpu_type).get_cluster_idle_gpus() + except Exception as e: + logger.warning(f"Cluster check failed: {e}. Fallback to current.") + return self.current_node_resource + + if not cluster_idle_gpus and cluster_idle_gpus < gpu_nums_val: + logger.warning( + f"No idle GPUs or idle GPUs not enough. Scale up aborted." + f"Need GPUs is {gpu_nums_val}, cluster idle GPUs is {cluster_idle_gpus}." + ) + return self.current_node_resource + + try: + cpu = int(cpu_val) + memory_gib = int(memory_val) # GiB + gpu_nums = int(gpu_nums_val) + + if cpu < 0 or memory_gib < 0 or gpu_nums < 0: + raise ValueError("Resource values cannot be negative.") + + # unit conversion (GiB -> Bytes) + memory_bytes = memory_gib * UnitConvertor.GIB_TO_BYTES + + gpu_type = str(gpu_type_val).strip() + if not gpu_type: + logger.warning("GPU type is empty, defaulting to 'none' or handling as per business logic.") + gpu_type = "none" + + except (ValueError, TypeError) as e: + logger.error(f"Invalid data type in customize_worker_resource: {e}. Using default resource.") + return self.current_node_resource + logger.info( + f"Applying custom resources -> CPU: {cpu}, Memory: {memory_gib}GiB, GPUs: {gpu_nums}, Type: {gpu_type}" + ) + return NodeResource( + cpu=cpu, + memory=memory_bytes, # GiB → B + gpu=gpu_nums, + gpu_type=gpu_type, + ) diff --git a/dlrover/brain/python/optimization/optalgorithm/opt_algorithm_manager.py b/dlrover/brain/python/optimization/optalgorithm/opt_algorithm_manager.py new file mode 100644 index 000000000..5fe93addc --- /dev/null +++ b/dlrover/brain/python/optimization/optalgorithm/opt_algorithm_manager.py @@ -0,0 +1,40 @@ +import threading + +from typing import Dict + +from dlrover.brain.python.common.job import ( + JobMeta, + OptimizeConfig, + NodeResource, +) +from dlrover.brain.python.common.log import default_logger as logger + +from dlrover.brain.python.optimization.optalgorithm.manual_optimize_job_resource import ManualOptimizeJobResource +from dlrover.brain.python.optimization.optalgorithm.base_optimize_job_resource import BaseOptimizeJobResource + +_locker = threading.RLock() + +class OptAlgorithmManager: + def __init__(self): + self.opt_algorithm_library: Dict[str, BaseOptimizeJobResource] = {} + self.current_node_resource = NodeResource() + self.register_optAlgorithm() + + def register_optAlgorithm(self): + self.opt_algorithm_library[BaseOptimizeJobResource.get_name()] = BaseOptimizeJobResource() + self.opt_algorithm_library[ManualOptimizeJobResource.get_name()] = ManualOptimizeJobResource() + + def generate_node_resource(self, job: JobMeta, conf: OptimizeConfig) -> NodeResource: + algorithm = conf.customized_config.get("algorithm", "") + + if algorithm not in self.opt_algorithm_library: + logger.warning(f"Invalid algorithm config for job {job.uuid}") + return self.current_node_resource + + try: + with _locker: + node_resource = self.opt_algorithm_library[algorithm].generate_node_resource(job, conf) + return node_resource + except Exception as e: + logger.error(f"Fail to generate node resource {job.uuid}: {e}") + return self.current_node_resource diff --git a/dlrover/brain/python/optimization/optimizer/base_optimizer.py b/dlrover/brain/python/optimization/optimizer/base_optimizer.py index 15d55b53c..7582d1edc 100644 --- a/dlrover/brain/python/optimization/optimizer/base_optimizer.py +++ b/dlrover/brain/python/optimization/optimizer/base_optimizer.py @@ -18,32 +18,32 @@ JobMeta, JobResource, NodeGroupResource, - NodeResource, + OptimizeConfig, ) from dlrover.brain.python.common.constants import ( Node, DefaultResource, ) +from dlrover.brain.python.optimization.optalgorithm.opt_algorithm_manager import OptAlgorithmManager class BaseOptimizer: def __init__(self): - pass + self.opt_algorithm_manager = OptAlgorithmManager() @staticmethod def get_name() -> str: return "BaseOptimizer" - def optimize(self, job: JobMeta) -> JobOptimizePlan: + def optimize(self, job: JobMeta, conf: OptimizeConfig) -> JobOptimizePlan: + conf.customized_config["algorithm"] = "BaseOptimizeJobResource" + current_node_resource = self.opt_algorithm_manager.generate_node_resource(job, conf) return JobOptimizePlan( - timestamp=int(time.time() * 1000), + time=int(time.time() * 1000), job_resource=JobResource( node_group_resources={ Node.NODE_TYPE_WORKER: NodeGroupResource( - resource=NodeResource( - cpu=DefaultResource.WORKER_CPU, - memory=DefaultResource.WORKER_MEM, - ), + resource=current_node_resource, ) }, ), diff --git a/dlrover/brain/python/optimization/optimizer/configmap_manual_optimizer.py b/dlrover/brain/python/optimization/optimizer/configmap_manual_optimizer.py new file mode 100644 index 000000000..bfedaea9e --- /dev/null +++ b/dlrover/brain/python/optimization/optimizer/configmap_manual_optimizer.py @@ -0,0 +1,38 @@ +import time + +from dlrover.brain.python.common.job import ( + JobOptimizePlan, + JobMeta, + JobResource, + NodeGroupResource, + OptimizeConfig, +) +from dlrover.brain.python.common.constants import ( + Node, +) +from dlrover.python.common.log import default_logger as logger +from dlrover.brain.python.optimization.optalgorithm.opt_algorithm_manager import OptAlgorithmManager + + +class ConfigMapManualOptimizer: + def __init__(self): + self.opt_algorithm_manager = OptAlgorithmManager() + + @staticmethod + def get_name() -> str: + return "ConfigMapManualOptimizer" + + def optimize(self, job: JobMeta, conf: OptimizeConfig) -> JobOptimizePlan: + conf.customized_config["algorithm"] = "ManualOptimizeJobResource" + current_node_resource = self.opt_algorithm_manager.generate_node_resource(job, conf) + return JobOptimizePlan( + time=int(time.time() * 1000), + job_resource=JobResource( + node_group_resources={ + Node.NODE_TYPE_WORKER: NodeGroupResource( + resource=current_node_resource, + ) + }, + ), + job_meta=job, + ) \ No newline at end of file diff --git a/dlrover/brain/python/optimization/optimizer_manager.py b/dlrover/brain/python/optimization/optimizer_manager.py index 1fb28a1fd..cbd0d03ee 100644 --- a/dlrover/brain/python/optimization/optimizer_manager.py +++ b/dlrover/brain/python/optimization/optimizer_manager.py @@ -21,6 +21,9 @@ from dlrover.brain.python.optimization.optimizer.base_optimizer import ( BaseOptimizer, ) +from dlrover.brain.python.optimization.optimizer.configmap_manual_optimizer import ( + ConfigMapManualOptimizer, +) from dlrover.brain.python.common.log import default_logger as logger from dlrover.brain.python.optimization.optimizer_router import OptimizerRouter @@ -39,6 +42,7 @@ def __init__(self): def register_optimizers(self): self.optimizers[BaseOptimizer.get_name()] = BaseOptimizer() + self.optimizers[ConfigMapManualOptimizer.get_name()] = ConfigMapManualOptimizer() def optimize( self, job: JobMeta, conf: OptimizeConfig @@ -49,7 +53,7 @@ def optimize( return None try: - plan = self.optimizers[conf.optimizer_name].optimize(job) + plan = self.optimizers[conf.optimizer_name].optimize(job, conf) return plan except Exception as e: logger.warning(f"Fail to optimize {job.uuid}: {e}") diff --git a/dlrover/brain/python/platform/k8s/configmap.py b/dlrover/brain/python/platform/k8s/configmap.py index c38d59308..724ccd292 100644 --- a/dlrover/brain/python/platform/k8s/configmap.py +++ b/dlrover/brain/python/platform/k8s/configmap.py @@ -10,12 +10,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from kubernetes import client, config, watch import time +import json +from kubernetes import client, config, watch from typing import Callable from dlrover.brain.python.common.log import default_logger as logger +JSON_KEY = "config.json" class ConfigMapWatcher: def __init__( @@ -66,3 +67,67 @@ def watch(self): f"Watch connection broken ({e}). Retrying in 5 seconds..." ) time.sleep(5) + +class ConfigMapReader: + def __init__(self, namespace, name): + self.name = name + self.namespace = namespace + try: + config.load_incluster_config() + except config.ConfigException: + config.load_kube_config() + + self.v1 = client.CoreV1Api() + + def read(self): + + try: + configmap = self.v1.read_namespaced_config_map(self.name, self.namespace) + + data = configmap.data + if not data: + logger.warning(f"ConfigMap {self.name} has no data") + return None + return data + except Exception as e: + logger.error(f"Failed to read ConfigMap {self.name}: {e}") + return None + + def read_json_Data(self): + + try: + configmap = self.v1.read_namespaced_config_map(self.name, self.namespace) + + data = configmap.data + if not data: + logger.warning(f"ConfigMap {self.name} has no data") + return None + + if JSON_KEY not in data: + logger.warning(f"The key {JSON_KEY} was not found in the ConfigMap") + return None + json_str_content = data[JSON_KEY] + + opt_config = json.loads(json_str_content) + if opt_config is None: + logger.warning(f"opt_config is None") + return None + return opt_config + except json.JSONDecodeError as e: + logger.error(f"JSON parsing failed,please check the format of the ConfigMap content:{e}") + return None + except Exception as e: + logger.error(f"Unknown error, failed to read ConfigMap {self.name}: {e}") + return None + + def get_opt_config(self): + try: + configmap_reader = ConfigMapReader(self.namespace, "brain-config") + data = configmap_reader.read_json_Data() + if not isinstance(data, dict): + return {} + result = data.get("opt_config", {}) + return result if isinstance(result, dict) else {} + except Exception as e: + logger.error(f"Failed to read opt_config from ConfigMap: {e}") + return {} \ No newline at end of file diff --git a/dlrover/brain/python/platform/k8s/monitor.py b/dlrover/brain/python/platform/k8s/monitor.py new file mode 100644 index 000000000..3fa1ce8cf --- /dev/null +++ b/dlrover/brain/python/platform/k8s/monitor.py @@ -0,0 +1,75 @@ +from kubernetes import client, config +from kubernetes.client.rest import ApiException +from dlrover.brain.python.common.log import default_logger as logger + + +class ResourceMonitor(object): + def __init__(self, namespace, gpu_resource_name): + self.namespace = namespace + self.gpu_resource_name = gpu_resource_name + # load K8s config + # In cluster,use config.load_incluster_config() + # local,use config.load_kube_config() + try: + config.load_incluster_config() + except config.ConfigException: + config.load_kube_config() # local kubeconfig mode + + + self.v1 = client.CoreV1Api() + + def get_cluster_idle_gpus(self) -> int: + """ + get idle gpus in cluster + algorithm:idles = Capacity - Allocated + """ + try: + nodes = self.v1.list_node() + + total_idle = 0 + for node in nodes.items: + node_name = node.metadata.name + + # Get the total allocatable GPUs on the node (Allocatable), which is usually equal to the number of physical GPUs installed. + max_gpus = 0 + if node.status.allocatable and self.gpu_resource_name in node.status.allocatable: + max_gpus = int(node.status.allocatable[self.gpu_resource_name]) + + # Filter out the running pods scheduled to this node. + field_selector = f"spec.nodeName={node_name}" + pods = self.v1.list_pod_for_all_namespaces(field_selector=field_selector) + + # Get the number of currently allocated GPUs. + current_allocated_gpus = 0 + for pod in pods.items: + if pod.status.phase not in ['Running', 'Pending']: + continue + + for container in pod.spec.containers: + if container.resources and container.resources.requests: + request_gpus = container.resources.requests.get(self.gpu_resource_name) + if request_gpus: + current_allocated_gpus += int(request_gpus) + + idle_gpu_count = max_gpus - current_allocated_gpus + + idle_count = max(0, idle_gpu_count) + + total_idle += idle_count + + logger.info( + f"total_gpuas:{max_gpus}," + f"current_allocated_gpus:{current_allocated_gpus}," + f"idle_gpus:{idle_count}" + ) + + logger.info(f"cluster total idle gpus:{total_idle}") + + return total_idle + + except ApiException as e: + logger.error(f"Call k8s API exception:{e}") + return 0 + except Exception as e: + logger.error(f"unknown exception:{e}") + return 0 \ No newline at end of file diff --git a/dlrover/python/common/comm.py b/dlrover/python/common/comm.py index 12c01e1d8..16f78dffd 100644 --- a/dlrover/python/common/comm.py +++ b/dlrover/python/common/comm.py @@ -230,6 +230,18 @@ class RdzvBlocked(Message): blocked: bool = False reason: str = "" +@dataclass +class SaveCheckpointReady(Message): + """Report ckpt save state to master.""" + + ckpt_save_ready: bool = False + reason: str = "" + +@dataclass +class ParamTunningReady(Message): + """Report param tunning state to master.""" + + param_tunning_ready: bool = False @dataclass class HeartBeat(Message): @@ -410,6 +422,21 @@ class RendezvousState(Message): round: int = 0 group: int = 0 +@dataclass +class BrainOptGpus(Message): + gpu_num: int = 0 + +@dataclass +class ExecBrainResourcePlanReady(Message): + exec_opt_resource_plan_ready: bool = False + +@dataclass +class BrainOptGpusRequest(Message): + pass + +@dataclass +class ExecBrainResourcePlanRequest(Message): + pass @dataclass class PsNodesRequest(Message): diff --git a/dlrover/python/common/constants.py b/dlrover/python/common/constants.py index 60eb07562..2723b2b5f 100644 --- a/dlrover/python/common/constants.py +++ b/dlrover/python/common/constants.py @@ -11,6 +11,31 @@ # See the License for the specific language governing permissions and # limitations under the License. +class WorkerResourceScaleAction(object): + REDUCE_GPUS = "reduce_gpus" + ADD_GPUS = "add_gpus" + MANUAL = "manual" + NOT_ADJUST = "not_adjust" + +class OptimizerType(object): + BASE_OPTIMIZER = "BaseOptimizer" + MANUAL_OPTIMIZER = "ConfigMapManualOptimizer" + OOM_OPTIMIZER = "OOMOptimizer" + GPU_BOTTLENECK_OPTIMIZER = "GpuBottleneckOptimizer" + +class ElasticDimension(object): + HORIZONTAL = "horizontal" + VERTICAL = "vertical" + +class Optimizers(object): + BASE_OPTIMIZER = "BaseOptimizer" + OOM_OPTIMIZER = "OOMOptimizer" + CONFIG_MAP_OPTIMIZER = "ConfigMapOptimizer" + +class BackendType(object): + GLOO = "gloo" + NCCL = "nccl" + HCCL = "hccl" class BasicClass(object): LOG_LEVEL_ENV = "DLROVER_LOG_LEVEL" @@ -324,6 +349,8 @@ class TrainingLoopStatus(object): class NodeEnv(object): + DLROVER_ELASTIC_DIMENSION = "DLROVER_ELASTIC_DIMENSION" + RELAUNCHED_POD = "RELAUNCHED_POD" DLROVER_MASTER_ADDR = "DLROVER_MASTER_ADDR" DLROVER_MASTER_SERVICE_TYPE = "DLROVER_MASTER_SERVICE_TYPE" diff --git a/dlrover/python/common/global_context.py b/dlrover/python/common/global_context.py index 96b6c7847..2f8be09cf 100644 --- a/dlrover/python/common/global_context.py +++ b/dlrover/python/common/global_context.py @@ -12,13 +12,14 @@ # limitations under the License. import importlib import os -from typing import List, Tuple +from typing import List, Tuple, Optional from dlrover.python.common.constants import ( CommunicationType, PendingTimeoutStrategyType, UserEnv, HangDetectionStrategy, + ElasticDimension, ) from dlrover.python.common.log import default_logger as logger from dlrover.python.common.singleton import Singleton @@ -45,6 +46,16 @@ class ConfigKeys(object): SECONDS_TO_WAIT_FAILED_PS = "seconds_to_wait_failed_ps" HANG_CPU_USAGE_RATE = "hang_cpu_usage_rate" + # resource elastic + VERTICAL_ELASTIC_SCALE_INTERVAL = "vertical_elastic_scale_interval" + VERTICAL_ELASTIC_OPT_INTERVAL = "vertical_elastic_opt_interval" + VERTICAL_ELASTIC_MAX_OPT_NUM = "vertical_elastic_max_opt_num" + CKPT_SAVE_STATUS_CHECK_MAX_WAIT_TIME = "ckpt_save_status_check_max_wait_time" + # check interval of ckpt save status + CKPT_SAVE_STATUS_CHECK_INTERVAL = "ckpt_save_status_check_interval" + # log record interval + LOG_INTERVAL_CHECK_NUMS = "log_interval_check_nums" + class DefaultValues(object): SERVICE_TYPE = CommunicationType.COMM_SERVICE_GRPC @@ -85,6 +96,14 @@ class DefaultValues(object): MAX_GROUP_RELAUNCH_COUNT = 3 # maximum node group relaunch count TRAINING_ELASTIC_MODE = "base" + # resource elastic + VERTICAL_ELASTIC_SCALE_INTERVAL = 180 # unit is seconds + VERTICAL_ELASTIC_OPT_INTERVAL = 36000 # unit is seconds + VERTICAL_ELASTIC_MAX_OPT_NUM = 3 # maximum number of times to optimize + CKPT_SAVE_STATUS_CHECK_MAX_WAIT_TIME = 120 # unit is seconds + CKPT_SAVE_STATUS_CHECK_INTERVAL = 1 # unit is seconds + LOG_INTERVAL_CHECK_NUMS = 5 # check interval of log record + class Context(Singleton): def __init__(self): @@ -155,6 +174,17 @@ def __init__(self): self.enable_dashboard = False self.dashboard_port = 8080 + ########################## Resource elastic ######################### + self.brain_resource_plan_ready = False + self.enable_elastic_resource = False + + self.elastic_dimension = ElasticDimension.HORIZONTAL + + self.configmap_manual_scale_switch = "off" + self.resource_monitor_switch = "off" + + self.temp_gpu_time_window_file_prefix = "dlrover_gpu_tw_" + def set_params_from_brain(self): self.train_speed_record_num = self.get_param_value_from_brain( ConfigKeys.TRAIN_SPEED_RECORD_NUM, @@ -214,6 +244,32 @@ def set_params_from_brain(self): DefaultValues.HANG_CPU_USAGE_RATE, ) + # resource elastic + self.vertical_elastic_scale_interval = self.get_param_value_from_brain( + ConfigKeys.VERTICAL_ELASTIC_SCALE_INTERVAL, + DefaultValues.VERTICAL_ELASTIC_SCALE_INTERVAL, + ) + self.vertical_elastic_opt_interval = self.get_param_value_from_brain( + ConfigKeys.VERTICAL_ELASTIC_OPT_INTERVAL, + DefaultValues.VERTICAL_ELASTIC_OPT_INTERVAL, + ) + self.vertical_elastic_max_opt_num = self.get_param_value_from_brain( + ConfigKeys.VERTICAL_ELASTIC_MAX_OPT_NUM, + DefaultValues.VERTICAL_ELASTIC_MAX_OPT_NUM, + ) + self.ckpt_save_status_check_max_wait_time = self.get_param_value_from_brain( + ConfigKeys.CKPT_SAVE_STATUS_CHECK_MAX_WAIT_TIME, + DefaultValues.CKPT_SAVE_STATUS_CHECK_MAX_WAIT_TIME, + ) + self.ckpt_save_status_check_interval = self.get_param_value_from_brain( + ConfigKeys.CKPT_SAVE_STATUS_CHECK_INTERVAL, + DefaultValues.CKPT_SAVE_STATUS_CHECK_INTERVAL, + ) + self.log_interval_check_nums = self.get_param_value_from_brain( + ConfigKeys.LOG_INTERVAL_CHECK_NUMS, + DefaultValues.LOG_INTERVAL_CHECK_NUMS, + ) + def config_master_port(self, port=0): host_ports_env = os.getenv("HOST_PORTS", "") self.master_port = None diff --git a/dlrover/python/common/node.py b/dlrover/python/common/node.py index 1cf79c217..ad9e0e301 100644 --- a/dlrover/python/common/node.py +++ b/dlrover/python/common/node.py @@ -125,6 +125,12 @@ def resource_str_to_node_resource(cls, resource_str): if "nvidia.com" in key: gpu_type = key gpu_num = int(resource[key]) + elif "huawei.com" in key: + gpu_type = key + gpu_num = int(resource[key]) + elif "metax-tech.com" in key: + gpu_type = key + gpu_num = int(resource[key]) return NodeResource(cpu, memory, gpu_type, gpu_num) @classmethod @@ -185,7 +191,7 @@ def __init__( self, node_type, node_id, - config_resource: NodeResource = NodeResource(0, 0), + config_resource: NodeResource = NodeResource(0, 0, "", 0), name=None, status=NodeStatus.INITIAL, start_time=None, @@ -220,7 +226,7 @@ def __init__( self.is_released = False self.exit_reason = "" self.config_resource = config_resource - self.used_resource = NodeResource(0.0, 0.0) + self.used_resource = NodeResource(0.0, 0.0, "", 0) self.start_hang_time: float = 0 self.init_time = time.time() self.eval_time = 0 diff --git a/dlrover/python/elastic_agent/master_client.py b/dlrover/python/elastic_agent/master_client.py index 6c31b0dda..f107ceb59 100644 --- a/dlrover/python/elastic_agent/master_client.py +++ b/dlrover/python/elastic_agent/master_client.py @@ -390,6 +390,24 @@ def num_nodes_waiting(self, rdzv_name): logger.warning("Fail to query the number of waiting nodes.") return 0 + def exec_opt_res_plan_ready(self): + request=comm.ExecBrainResourcePlanRequest() + try: + result: comm.ExecBrainResourcePlanReady=self._get(request) + return result.exec_opt_resource_plan_ready + except Exception: + logger.warning("Fail to get resouce from brain") + return comm.ExecBrainResourcePlanReady().exec_opt_resource_plan_ready + + def get_gpus_from_brain_resource_plan(self): + request=comm.BrainOptGpusRequest() + try: + result: comm.BrainOptGpus=self._get(request) + return result.gpu_num + except Exception: + logger.warning("Fail to get adjust gpus from brain") + return comm.BrainOptGpus().gpu_num + def join_rendezvous(self, node_rank, local_world_size, rdzv_name=""): request = comm.JoinRendezvousRequest( node_id=self._node_id, @@ -529,6 +547,14 @@ def report_action(self, action: DiagnosisAction): ) self._report(message) + def set_save_ckpt_status(self, ckpt_save_ready, reason=""): + message = comm.SaveCheckpointReady(ckpt_save_ready=ckpt_save_ready, reason=reason) + self._report(message) + + def set_param_tunning_ready(self, param_tunning_ready): + message = comm.ParamTunningReady(param_tunning_ready=param_tunning_ready) + self._report(message) + @classmethod def singleton_instance(cls, *args, **kwargs): if not cls._instance: diff --git a/dlrover/python/elastic_agent/torch/training.py b/dlrover/python/elastic_agent/torch/training.py index 155967940..390ce9f1e 100644 --- a/dlrover/python/elastic_agent/torch/training.py +++ b/dlrover/python/elastic_agent/torch/training.py @@ -87,6 +87,7 @@ ScriptPath, NodeExitReason, RendezvousErrorType, + ElasticDimension, ) from dlrover.python.common.error import ProcessError from dlrover.python.common.log import default_logger as logger @@ -362,7 +363,7 @@ def auto_configure_params(self): self.accelerator = Accelerators.MTHREADS_GPU logger.info( f"Use {self.accelerator} device for training, " - f"cuda is available: {torch.cuda.is_available()}." + f"xpu is available: {torch.cuda.is_available()}." ) if not self.auto_config: @@ -1443,8 +1444,34 @@ def _invoke_run(self, role: str = DEFAULT_ROLE) -> RunResult: ): self.set_rdzv_blocked(False) self._restart_workers(self._worker_group) + else: + # vertical elastic + elastic_dimension = os.getenv( + "DLROVER_ELASTIC_DIMENSION", ElasticDimension.HORIZONTAL + ).lower() + if elastic_dimension == ElasticDimension.VERTICAL and self._client.exec_opt_res_plan_ready(): + self._save_ckpt_by_vertical_elastic() else: raise Exception(f"[{role}] worker group in {state.name} state") + + def _save_ckpt_by_vertical_elastic(self): + # self._graceful_exit_workers( + # worker_group=self._worker_group + # ) + elastic_mode = os.getenv( + "DLROVER_TRAINING_ELASTIC_MODE", "base" + ).lower() + self._save_ckpt_to_storage() + + if ( + self._worker_group.group_rank == 0 + and elastic_mode == "ucp" + ): + self.ucp() + self.set_save_ckpt_status(True) + + def set_save_ckpt_status(self, ckpt_save_ready, reason=""): + return self._client.set_save_ckpt_status(ckpt_save_ready=ckpt_save_ready, reason=reason) def _process_diagnosis_action(self, action: DiagnosisAction): if not action: @@ -1884,25 +1911,32 @@ def launch_agent( f"name: {entrypoint_name}, rank: {node_rank}" ) + elastic_dimension = os.getenv( + "DLROVER_ELASTIC_DIMENSION", ElasticDimension.HORIZONTAL + ).lower() + if elastic_dimension == ElasticDimension.VERTICAL: + _auto_param_tuning_by_vertical_elastic(config, args) + logger.info( f"Starting training agent with launch configs:\n" - f" entrypoint : {entrypoint_name}\n" - f" min_nodes : {config.min_nodes}\n" - f" max_nodes : {config.max_nodes}\n" - f" nproc_per_node : {config.nproc_per_node}\n" - f" run_id : {config.run_id}\n" - f" rdzv_backend : {config.rdzv_backend}\n" - f" rdzv_endpoint : {config.rdzv_endpoint}\n" - f" rdzv_configs : {config.rdzv_configs}\n" - f" max_restarts : {config.max_restarts}\n" - f" monitor_interval : {config.monitor_interval}\n" - f" log_dir : {config.get_log_dir()}\n" - f" metrics_cfg : {config.metrics_cfg}\n" - f" training_log : {config.training_log_file}\n" - f" failure_errors : {config.failure_node_errors}\n" - f" numa_affinity : {config.numa_affinity}\n" - f" accelerator : {config.accelerator}\n" - f" ucp_device_type : {config.ucp_device_type}\n" + f" entrypoint : {entrypoint_name}\n" + f" min_nodes : {config.min_nodes}\n" + f" max_nodes : {config.max_nodes}\n" + f" nproc_per_node : {config.nproc_per_node}\n" + f" run_id : {config.run_id}\n" + f" rdzv_backend : {config.rdzv_backend}\n" + f" rdzv_endpoint : {config.rdzv_endpoint}\n" + f" rdzv_configs : {config.rdzv_configs}\n" + f" max_restarts : {config.max_restarts}\n" + f" monitor_interval : {config.monitor_interval}\n" + f" log_dir : {config.get_log_dir()}\n" + f" metrics_cfg : {config.metrics_cfg}\n" + f" training_log : {config.training_log_file}\n" + f" failure_errors : {config.failure_node_errors}\n" + f" numa_affinity : {config.numa_affinity}\n" + f" accelerator : {config.accelerator}\n" + f" ucp_device_type : {config.ucp_device_type}\n" + f" elastic_dimension : {elastic_dimension}\n" ) _agent_evt.start(args=vars(config)) @@ -2005,6 +2039,24 @@ def launch_agent( agent.stop_executor() monitor.stop() +def _auto_param_tuning_by_vertical_elastic(config: ElasticLaunchConfig, args: List[Any]): + logger.info(f"original nproc_per_node: {config.nproc_per_node}") + + # Get resource plan from brain and Auto-tuning train params, eg: nproc_per_node . + _master_client = MasterClient.singleton_instance() + exec_opt_resource_plan_ready = _master_client.exec_opt_res_plan_ready() + logger.info(f"DEBUG: brain_resource_ready = {exec_opt_resource_plan_ready}") + + if exec_opt_resource_plan_ready: + adjust_gpu_nums = _master_client.get_gpus_from_brain_resource_plan() + logger.warning(f"adjust gpu nums from brain is: {adjust_gpu_nums}") + if adjust_gpu_nums > 0 and config.nproc_per_node != adjust_gpu_nums: + config.nproc_per_node = adjust_gpu_nums + # report master + _master_client.set_param_tunning_ready(param_tunning_ready=True) + + logger.info(f"adjust nproc_per_node: {config.nproc_per_node}") + def _create_worker_spec( node_rank: int, diff --git a/dlrover/python/master/args.py b/dlrover/python/master/args.py index 8267e69bd..6d122eb30 100644 --- a/dlrover/python/master/args.py +++ b/dlrover/python/master/args.py @@ -131,6 +131,19 @@ def _build_master_args_parser(): type=pos_int, help="The port of the DLRover dashboard.", ) + parser.add_argument( + "--elastic_dimension", + default="horizontal", + type=str, + help="The type of elastic, should be 'horizontal' or 'vertical'", + ) + parser.add_argument( + "--configmap_manual_scale_switch", + default="off", + type=str, + help="The configmap manual scale switch, should be 'on' or 'off'", + ) + return parser diff --git a/dlrover/python/master/main.py b/dlrover/python/master/main.py index f1e0178f8..5f8fae145 100644 --- a/dlrover/python/master/main.py +++ b/dlrover/python/master/main.py @@ -19,6 +19,7 @@ DistributionStrategy, NodeType, PlatformType, + ElasticDimension, ) from dlrover.python.common.event.reporter import get_event_reporter from dlrover.python.common.global_context import Context, DefaultValues @@ -41,6 +42,10 @@ def update_context(job_args: JobArgs): if job_args.distribution_strategy == DistributionStrategy.ALLREDUCE: _dlrover_context.relaunch_always = True _dlrover_context.training_elastic_mode = job_args.training_elastic_mode + + _dlrover_context.elastic_dimension = job_args.elastic_dimension + _dlrover_context.configmap_manual_scale_switch = job_args.configmap_manual_scale_switch + _dlrover_context.set_params_from_brain() _dlrover_context.print_config() @@ -48,6 +53,20 @@ def update_context(job_args: JobArgs): def run(args): job_args = new_job_args(args.platform, args.job_name, args.namespace) job_args.initilize() + + if args.xpu_type.lower() == "ascend": + job_args.xpu_type = Accelerators.ASCEND_NPU + elif args.xpu_type.lower() == "nvidia": + job_args.xpu_type = Accelerators.NVIDIA_GPU + elif args.xpu_type.lower() == "mthreads": + job_args.xpu_type = Accelerators.MTHREADS_GPU + else: + logger.info(f"{args.xpu_type}, use cpu as default") + job_args.xpu_type = Accelerators.GENERIC_CPU + + job_args.elastic_dimension = args.elastic_dimension.lower() + job_args.configmap_manual_scale_switch = args.configmap_manual_scale_switch.lower() + logger.info("Job args : %s", job_args.to_json(indent=4)) _dlrover_context.config_master_port(port=args.port) _dlrover_context.seconds_to_timeout_task_process = ( @@ -73,15 +92,6 @@ def run(args): _dlrover_context.dashboard_port = args.dashboard_port job_args.training_elastic_mode = args.training_elastic_mode - if args.xpu_type.lower() == "ascend": - job_args.xpu_type = Accelerators.ASCEND_NPU - elif args.xpu_type.lower() == "nvidia": - job_args.xpu_type = Accelerators.NVIDIA_GPU - elif args.xpu_type.lower() == "mthreads": - job_args.xpu_type = Accelerators.MTHREADS_GPU - else: - logger.info(f"{args.xpu_type}, use cpu as default") - job_args.xpu_type = Accelerators.GENERIC_CPU if job_args.platform == PlatformType.LOCAL: from dlrover.python.master.local_master import LocalJobMaster diff --git a/dlrover/python/master/node/dist_job_manager.py b/dlrover/python/master/node/dist_job_manager.py index 5329c2596..0ddae67cf 100644 --- a/dlrover/python/master/node/dist_job_manager.py +++ b/dlrover/python/master/node/dist_job_manager.py @@ -33,6 +33,9 @@ NodeStatus, NodeType, TrainingExceptionLevel, + OptimizeMode, + ElasticDimension, + WorkerResourceScaleAction, ) from dlrover.python.common.global_context import Context from dlrover.python.common.log import default_logger as logger @@ -52,6 +55,10 @@ JobAutoScaler, new_job_auto_scaler, ) +from dlrover.python.master.node.job_resource_scaler import ( + JobResourceScaler, + new_job_resource_scaler, +) from dlrover.python.master.node.job_context import get_job_context from dlrover.python.master.node.job_manager import JobManager from dlrover.python.master.node.ps import ParameterServerManager @@ -80,6 +87,7 @@ from dlrover.python.master.watcher.factory import ( new_node_watcher, new_scale_plan_watcher, + new_configmap_scale_watcher, ) from dlrover.python.scheduler.factory import new_elastic_job from dlrover.python.scheduler.job import ElasticJob, JobArgs @@ -91,7 +99,7 @@ job_ctx = get_job_context() _MAX_POD_RELAUNCH_COUNT = 5 - +JSON_KEY = "config.json" def is_positive_exit(exit_reason): if exit_reason in [NodeExitReason.DIAG_FAIL, NodeExitReason.NO_HEARTBEAT]: @@ -149,7 +157,11 @@ def __init__( elif job_args.distribution_strategy == DistributionStrategy.ALLREDUCE: self._job_optimizer = AllreduceJobResourceOptimizer( self._job_resource.node_group_resources[NodeType.WORKER], + job_args.optimize_mode, job_args.job_uuid, + job_args.resource_limits, + job_args.job_name, + job_args.namespace, ) else: raise ValueError( @@ -198,6 +210,14 @@ def __init__( self._group_relaunch_count = 0 self._max_group_relaunch_count = _dlrover_context.max_relaunch_count + self._configmap_manual_scaler = new_configmap_scale_watcher( + job_args.platform, + job_args.job_name, + job_args.namespace, + job_args.job_uuid, + self.handle_configmap_update_callback, + ) + def start(self): self._scaler.start() self._job_optimizer.update_job_uuid(self._job_args.job_uuid) @@ -205,6 +225,13 @@ def start(self): self._adjust_worker_for_estimator() self._init_nodes() self._init_job_auto_scaler() + + # init job resource scaler and configmap manual scaler + if self._job_args.elastic_dimension == ElasticDimension.VERTICAL: + self._init_job_resource_scaler() + if _dlrover_context.configmap_manual_scale_switch == "on": + self._configmap_manual_scaler.start() + plan = self._create_initial_scale_plan() if not self._has_running_workers(): # The job relaunches the evicted master, there are alive @@ -234,6 +261,40 @@ def start(self): daemon=True, ).start() + def handle_configmap_update_callback(self, data: dict): + logger.info(f"Manual scale ConfigMap changed, will start resource scaling.") + if not data: + logger.warning(f"Manual scale configMap has no data, skip manual resource scaling.") + return + + if JSON_KEY not in data: + logger.warning(f"The key {JSON_KEY} was not found in the manual scale ConfigMap, skip manual resource scaling.") + return + json_str_content = data[JSON_KEY] + + # parse JSON str to Python dict + manual_config = json.loads(json_str_content) + if not isinstance(manual_config, dict): + logger.warning(f"ConfigMap data is not a dictionary, type: {type(manual_config)}. Skip manual resource scaling") + return + + job_names = manual_config.get("job_names", None) + if not job_names or job_names is None: + logger.warning(f"The key 'job_names' was not found in the manual scale ConfigMap, skip manual resource scaling.") + _dlrover_context.enable_elastic_resource = False + return + job_list = [name.strip() for name in job_names.split(',')] + + if self._job_args.job_name not in job_list: + logger.warning(f"The job {self._job_args.job_name} is not in the job list {job_list}, skip manual resource scaling.") + _dlrover_context.enable_elastic_resource = False + + _dlrover_context.enable_elastic_resource = True + + self.start_resource_scaling(WorkerResourceScaleAction.MANUAL) + + logger.info(f"Manual resource scaling finished.") + def _has_running_workers(self): nodes = self._node_watcher.list() for node in nodes: @@ -255,6 +316,18 @@ def is_all_reduce_type_job(self): self._job_args.distribution_strategy == DistributionStrategy.ALLREDUCE ) + + def get_gpus_from_brain_resource_plan(self): + return self._job_optimizer.get_brain_resource_plan_opt_gpus() + + def set_save_ckpt_status(self, save_ckpt_ready: bool, reason: Optional[str] = None): + self._job_resource_scaler.set_save_ckpt_status_by_vertical_elastic(save_ckpt_ready) + + def exec_opt_res_plan_ready(self): + return self._job_resource_scaler.exec_opt_res_plan_ready() + + def set_param_tunning_ready(self, param_tunning_ready: bool): + self._job_resource_scaler.set_param_tunning_ready(param_tunning_ready) def restart(self): if not self.is_all_reduce_type_job(): @@ -508,6 +581,19 @@ def _init_job_auto_scaler(self): "Create job autoscaler: %s", self._job_autoscaler.__class__ ) + def _init_job_resource_scaler(self): + self._job_resource_scaler: JobResourceScaler = new_job_resource_scaler( + self._job_args.distribution_strategy, + self._job_resource, + self._job_optimizer, + self._perf_monitor, + self._worker_manager, + self._scaler, + ) + logger.info( + "Create job resource scaler: %s", self._job_resource_scaler.__class__ + ) + def _monitor_nodes(self): logger.info("Start monitoring nodes events.") while True: @@ -1375,6 +1461,10 @@ def start_auto_scaling(self): """Start auto scaling nodes to improve the training throughput.""" self._job_autoscaler.start_auto_scaling() + def start_resource_scaling(self, scale_action: str): + """Start resource scaling nodes to improve the training throughput.""" + self._job_resource_scaler.start_resource_scaling(scale_action) + def _set_ps_addrs_in_plan(self, plan: ScalePlan): ps_addrs = self._ps_manager.get_ps_addrs() plan.ps_addrs.extend(ps_addrs) diff --git a/dlrover/python/master/node/job_manager.py b/dlrover/python/master/node/job_manager.py index 7807e9dff..9cee8d744 100644 --- a/dlrover/python/master/node/job_manager.py +++ b/dlrover/python/master/node/job_manager.py @@ -12,7 +12,7 @@ # limitations under the License. from abc import ABCMeta, abstractmethod -from typing import Dict +from typing import Dict, Optional from dlrover.python.common.constants import TrainingExceptionLevel from dlrover.python.common.event.reporter import get_event_reporter @@ -150,6 +150,26 @@ def remove_training_nodes(self): def start_auto_scaling(self): pass + @abstractmethod + def start_resource_scaling(self, scale_action): + pass + + @abstractmethod + def get_gpus_from_brain_resource_plan(self): + pass + + @abstractmethod + def set_save_ckpt_status(self, save_ckpt_ready: bool, reason: Optional[str] = None): + pass + + @abstractmethod + def exec_opt_res_plan_ready(self): + pass + + @abstractmethod + def set_param_tunning_ready(self, param_tunning_ready: bool): + pass + @abstractmethod def all_running_node_hanged(self): pass diff --git a/dlrover/python/master/node/job_resource_scaler.py b/dlrover/python/master/node/job_resource_scaler.py new file mode 100644 index 000000000..d31da366b --- /dev/null +++ b/dlrover/python/master/node/job_resource_scaler.py @@ -0,0 +1,338 @@ +# Copyright 2023 The DLRover Authors. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import time +import os +from abc import ABCMeta, abstractmethod +from typing import Dict + +from dlrover.python.common.constants import ( + DistributionStrategy, + NodeStatus, + NodeType, +) +from dlrover.python.common.global_context import Context +from dlrover.python.common.log import default_logger as logger +from dlrover.python.common.node import NodeResource, NodeGroupResource +from dlrover.python.master.monitor.perf_monitor import PerfMonitor +from dlrover.python.master.node.job_context import get_job_context +from dlrover.python.master.node.ps import ParameterServerManager +from dlrover.python.master.node.worker import WorkerManager +from dlrover.python.master.resource.job import ( + JobResource, + JobResourceOptimizer, +) +from dlrover.python.master.resource.optimizer import ResourcePlan +from dlrover.python.master.scaler.base_scaler import ScalePlan, Scaler + +_dlrover_context = Context.singleton_instance() + + +def new_job_resource_scaler( + job_strategy, + job_resource: JobResource, + job_optimizer: JobResourceOptimizer, + perf_monitor: PerfMonitor, + worker_manager: WorkerManager, + node_scaler: Scaler, +): + if job_strategy == DistributionStrategy.ALLREDUCE: + return AllreduceTrainingResourceScaler( + job_resource, + job_optimizer, + perf_monitor, + worker_manager, + node_scaler, + ) + else: + raise ValueError("No job auto scaler for %s", job_strategy) + + +class JobResourceScaler(metaclass=ABCMeta): + """JobResourceScaler scale up/down nodes resource of job.""" + + def __init__( + self, + job_resource: JobResource, + job_optimizer: JobResourceOptimizer, + perf_monitor: PerfMonitor, + node_scaler: Scaler, + scale_interval: int, + ): + self._job_resource = job_resource + self._job_optimizer = job_optimizer + self._perf_monitor = perf_monitor + self._scaler = node_scaler + self._scale_interval = scale_interval + self._job_context = get_job_context() + + self._suggested_stop = False + self._autoscaling_started = False + + self._exec_opt_resource_ready = False + self._save_ckpt_status = False + + def suggested_stop(self): + return self._suggested_stop + + @abstractmethod + def start_resource_scaling(self, scale_action: str): + """Start vertical elastic nodes of a job""" + pass + + @abstractmethod + def execute_job_optimization_plan(self, plan: ResourcePlan): + """Scale nodes of a job by a ResourcePlan""" + if plan and not plan.empty(): + logger.info(f"Execute job optimization plan: {plan.to_json()}.") + + @abstractmethod + def exec_opt_res_plan_ready(self): + """Set the status of the job to start executing the optimization plan""" + pass + + @abstractmethod + def set_save_ckpt_status_by_vertical_elastic(self, save_ckpt_ready: bool): + """set save_ckpt_status by vertical elastic""" + pass + + @abstractmethod + def set_param_tunning_ready(self, param_tunning_ready: bool): + """set param tunning ready by vertical elastic""" + pass + +class AllreduceTrainingResourceScaler(JobResourceScaler): + """Scale a Job resource with Allreduce strategy""" + + def __init__( + self, + job_resource: JobResource, + job_optimizer: JobResourceOptimizer, + perf_monitor: PerfMonitor, + worker_manager: WorkerManager, + node_scaler: Scaler, + ) -> None: + super().__init__( + job_resource, + job_optimizer, + perf_monitor, + node_scaler, + 1800, + ) + self._worker_manager = worker_manager + self.scale_action: str = "" + self.last_plan_time = 0 + self.opt_num = 0 + + def get_job_nodes(self, node_type=""): + if node_type == "": + return self._job_context.job_nodes() + return self._job_context.job_nodes_by_type(node_type) + + def start_resource_scaling(self, scale_action: str): + self.scale_action = scale_action + logger.info( + f"Start Resource scaling! scale_action is {self.scale_action}, " + f"enable_elastic_resource is {_dlrover_context.enable_elastic_resource}" + ) + + if not _dlrover_context.enable_elastic_resource: + logger.warning(f"enable_elastic_resource is False, skip scaling.") + return + + threading.Thread( + target=self._adjust_running_worker_resource, + name="allreduce-worker-resource-scaler", + daemon=True, + ).start() + + + def _adjust_running_worker_resource(self): + """Adjust the resource of worker.""" + + opt_interval = _dlrover_context.vertical_elastic_opt_interval + max_opt_num = _dlrover_context.vertical_elastic_max_opt_num + ckpt_wait_time = _dlrover_context.ckpt_save_status_check_max_wait_time + ckpt_check_interval = _dlrover_context.ckpt_save_status_check_interval + log_step = _dlrover_context.log_interval_check_nums + logger.info(f"vertical elastic control params:\n" + f" vertical elastic opt_interval is: {opt_interval}\n" + f" vertical elastic max_opt_num is: {max_opt_num}\n" + f" ckpt_save_status_check_max_wait_time is: {ckpt_wait_time}\n" + f" ckpt_save_status_check_interval is: {ckpt_check_interval}\n" + f" log_interval_check_nums is: {log_step}\n" + ) + + try: + # Control the interval to query plans + current_time = time.time() + elapsed = current_time - self.last_plan_time + + logger.info(f"Elapsed: {elapsed:.1f}s," + f"current_time: {current_time}s," + f"last_plan_time: {self.last_plan_time}s," + f"opt_interval Config: {opt_interval}s," + f"Diff: {opt_interval - elapsed:.1f}s" + ) + + if self.opt_num > max_opt_num: + logger.warning(f"Reached max optimization attempts ({max_opt_num}). Skip scaling.") + return + + manual_scale_switch = _dlrover_context.configmap_manual_scale_switch + if elapsed <= opt_interval: + logger.warning(f"The time elapsed since the last execution is too short; Manual scale switch is {manual_scale_switch}.") + if manual_scale_switch == "on": + logger.warning(f"Continue manual scaling.") + else: + logger.warning(f"Skip scaling.") + return + + if not self._try_execute_optimization(ckpt_wait_time, ckpt_check_interval, log_step): + return + + self.opt_num += 1 + self.last_plan_time = time.time() + + logger.info(f"Optimization round {self.opt_num} completed successfully.") + except Exception as e: + logger.error( + f"Failed to worker resource scale for AllReduce Training: {e}" + ) + self._exec_opt_resource_ready = False + + def _try_execute_optimization(self, ckpt_wait_time, ckpt_check_sec, log_step): + """algorithm:get plan -> wait CKPT -> exec plan。""" + # A. Get plan + alive_num = self._get_alive_worker_num() + self._job_optimizer.set_alive_node_num(alive_num) + + plan = self._job_optimizer.get_job_resource_plan(self.scale_action) + if not plan or plan.empty(): + logger.warning("No valid resource plan generated.") + return False + + if not getattr(_dlrover_context, 'enable_elastic_resource', False): + return False + + # B. Wait Checkpoint finish + self._exec_opt_resource_ready = True + if not self._wait_for_ckpt_ready(ckpt_wait_time, ckpt_check_sec, log_step): + self._exec_opt_resource_ready = False + return False + + # C. Exec plan + try: + self.execute_job_optimization_plan(plan) + # self._exec_opt_resource_ready = False + return True + except Exception as e: + logger.error(f"Failed to execute plan: {e}", exc_info=True) + self._exec_opt_resource_ready = False + return False + + def _wait_for_ckpt_ready(self, max_wait, check_interval, log_step): + """Wait ckpt ready""" + + start_time = time.time() + count = 0 + + while not self._save_ckpt_status: + if not _dlrover_context.enable_elastic_resource: + logger.warning(f"Enable elastic resource is {_dlrover_context.enable_elastic_resource}, Resource Scaling stopped while waiting for CKPT.") + return False + + if (time.time() - start_time) > max_wait: + logger.warning("Waiting for CKPT save timed out.") + return False + + count += 1 + if count % log_step == 0: + logger.info(f"Waiting for CKPT... ({time.time() - start_time:.1f}s)") + + time.sleep(check_interval) + + logger.info("CKPT save completed. Proceeding with optimization.") + return True + + def _get_alive_worker_num(self): + worker_num = 0 + workers = self.get_job_nodes(NodeType.WORKER) + for _, worker in workers.items(): + if worker.status in [ + NodeStatus.RUNNING, + NodeStatus.PENDING, + NodeStatus.INITIAL, + NodeStatus.SUCCEEDED, + ]: + worker_num += 1 + return worker_num + + def execute_job_optimization_plan(self, plan: ResourcePlan): + """Execute the optimization plan of the training job. + The plan may adjust the number of PS and workers or + adjust the cpu and memory of nodes. + """ + super().execute_job_optimization_plan(plan) + scale_plan = ScalePlan() + if not plan or plan.empty(): + return scale_plan + for node_type, group in plan.node_group_resources.items(): + if node_type != NodeType.WORKER: + continue + if group.count > 0: + self._job_resource.update_node_group_resource( + node_type, + group.count, + group.node_resource.cpu, + group.node_resource.memory, + group.node_resource.gpu_type, + group.node_resource.gpu_num, + ) + group = self._job_resource.get_node_group_resource(node_type) + self._perf_monitor.set_target_worker_num(group.count) + if _dlrover_context.enable_elastic_resource: + worker_plan = self.get_worker_plan_with_vertical_elastic(group) + else: + worker_plan = self.get_worker_plan_with_horizontal_elastic(group) + scale_plan.merge(worker_plan) + self._scaler.scale(scale_plan) + return scale_plan + + def exec_opt_res_plan_ready(self): + return self._exec_opt_resource_ready + + def set_save_ckpt_status_by_vertical_elastic(self, save_ckpt_ready: bool): + self._save_ckpt_status = save_ckpt_ready + + def set_param_tunning_ready(self, param_tunning_ready: bool): + if param_tunning_ready: + logger.info("Param tunning is completed, set exec_opt_resource_ready to False") + # self._exec_opt_resource_ready = False + + def get_worker_plan_with_vertical_elastic(self, worker_resource: NodeGroupResource): + logger.info("[AllReduce] start to adjust worker resource by vertical elastic") + + worker_plan = ScalePlan() + + sacle_down_worker_plan = self._worker_manager.adjust_worker_by_scale_operate(worker_resource, "scale_down") + worker_plan = self._worker_manager.adjust_worker_by_scale_operate(worker_resource, "scale_up") + + if sacle_down_worker_plan: + worker_plan.remove_nodes.extend(sacle_down_worker_plan.remove_nodes) + logger.info("[AllReduce] after adjust worker, worker plan is: %s", worker_plan.to_json()) + return worker_plan + + def get_worker_plan_with_horizontal_elastic(self, worker_resource: NodeGroupResource): + return self._worker_manager.adjust_worker(worker_resource) \ No newline at end of file diff --git a/dlrover/python/master/node/local_job_manager.py b/dlrover/python/master/node/local_job_manager.py index 0cfde8693..f939eee89 100644 --- a/dlrover/python/master/node/local_job_manager.py +++ b/dlrover/python/master/node/local_job_manager.py @@ -10,7 +10,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +from typing import Optional from dlrover.python.common.comm import ParallelConfig from dlrover.python.common.constants import NodeStatus, NodeType from dlrover.python.common.node import Node @@ -140,6 +140,21 @@ def remove_training_nodes(self): def start_auto_scaling(self): pass + def start_resource_scaling(self, scale_action): + pass + + def get_gpus_from_brain_resource_plan(self): + pass + + def set_save_ckpt_status(self, save_ckpt_ready: bool, reason: Optional[str] = None): + pass + + def exec_opt_res_plan_ready(self): + pass + + def set_param_tunning_ready(self, param_tunning_ready: bool): + pass + def all_running_node_hanged(self): return False diff --git a/dlrover/python/master/node/ps.py b/dlrover/python/master/node/ps.py index 0ff1f5531..ad54a0a10 100644 --- a/dlrover/python/master/node/ps.py +++ b/dlrover/python/master/node/ps.py @@ -156,7 +156,7 @@ def _scale_down_ps(self, down_num): self._ps_cluster_changed = True new_ps_num = self._job_resource.ps_num - down_num self._job_resource.update_node_group_resource( - NodeType.PS, new_ps_num, 0, 0 + NodeType.PS, new_ps_num, 0, 0, "", 0 ) running_ps = self._get_alive_ps() for node in reversed(running_ps): diff --git a/dlrover/python/master/node/worker.py b/dlrover/python/master/node/worker.py index 58d3c4cd6..21dd72fee 100644 --- a/dlrover/python/master/node/worker.py +++ b/dlrover/python/master/node/worker.py @@ -138,6 +138,8 @@ def adjust_worker(self, worker_resource: NodeGroupResource): num, worker_resource.node_resource.cpu, worker_resource.node_resource.memory, + worker_resource.node_resource.gpu_type, + worker_resource.node_resource.gpu_num, ) ) alive_workers = [] @@ -153,6 +155,33 @@ def adjust_worker(self, worker_resource: NodeGroupResource): plan = self._scale_down_workers(alive_num - num, alive_workers) return plan + def adjust_worker_by_scale_operate(self, worker_resource: NodeGroupResource, scale_operate: str): + plan = ScalePlan() + num = worker_resource.count + logger.info( + "Adjust worker resource to {}, {}, {}".format( + num, + worker_resource.node_resource.cpu, + worker_resource.node_resource.memory, + worker_resource.node_resource.gpu_type, + worker_resource.node_resource.gpu_num + ) + ) + alive_workers = [] + nodes = self._get_mutable_nodes() + for worker in nodes.values(): + if worker.status in ALIVE_STATUS: + alive_workers.append(worker) + alive_num = len(alive_workers) + logger.info("Adjust worker, alive_num = %s", alive_num) + + with self._lock: + if scale_operate.strip() == "scale_up": + plan = self._scale_up_workers(num) + elif scale_operate.strip() == "scale_down": + plan = self._scale_down_workers(alive_num, alive_workers) + return plan + def _scale_up_workers(self, up_num): """Launch up_num workers.""" plan = ScalePlan() diff --git a/dlrover/python/master/resource/job.py b/dlrover/python/master/resource/job.py index f32d8e31d..be7ec4aa7 100644 --- a/dlrover/python/master/resource/job.py +++ b/dlrover/python/master/resource/job.py @@ -24,6 +24,9 @@ NodeType, OptimizeMode, OptimizeWorkerPhase, + OptimizerType, + WorkerResourceScaleAction, + ElasticDimension, ) from dlrover.python.common.global_context import Context from dlrover.python.common.log import default_logger as logger @@ -32,6 +35,12 @@ from dlrover.python.master.resource.brain_optimizer import ( BrainResoureOptimizer, ) +from dlrover.python.master.resource.py_brain_optimizer import ( + PyBrainResoureOptimizer, +) +from dlrover.brain.python.common.job import ( + OptimizeConfig, +) from dlrover.python.master.resource.local_optimizer import PSLocalOptimizer from dlrover.python.master.resource.optimizer import ( ResourcePlan, @@ -67,6 +76,25 @@ def new_ps_resource_optimizer( ) return SimpleOptimizer(job_uuid, resource_limits) +def new_allreduce_resource_optimizer( + optimize_mode: str, job_uuid, resource_limits: ResourceLimits, job_name, namespace +): + """ + In the allreduce scenario, create a suitable resource optimizer instance based on the optimization mode. + + """ + logger.info("New %s allreduce resource optimizer for job %s(%s)", optimize_mode, job_name, job_uuid) + + if optimize_mode == OptimizeMode.CLUSTER: + return PyBrainResoureOptimizer(job_uuid, job_name, "prod-cluster", namespace, resource_limits) + elif optimize_mode == OptimizeMode.SINGLE_JOB: + return SimpleOptimizer(job_uuid, resource_limits) + else: + logger.warning( + "Not support optimization mode %s, use a simple optimizer", + optimize_mode, + ) + return SimpleOptimizer(job_uuid, resource_limits) class JobResource(JsonSerializable): def __init__(self): @@ -83,7 +111,7 @@ def _get_group_node_num(self, node_type): def get_node_types(self): return list(self.node_group_resources.keys()) - def update_node_group_resource(self, node_type, num, cpu, memory): + def update_node_group_resource(self, node_type, num, cpu, memory, gpu_type, gpu_num): self.node_group_resources.setdefault( node_type, NodeGroupResource( @@ -95,6 +123,8 @@ def update_node_group_resource(self, node_type, num, cpu, memory): resource.count = num or resource.count resource.node_resource.cpu = cpu or resource.node_resource.cpu resource.node_resource.memory = memory or resource.node_resource.memory + resource.node_resource.gpu_type = memory or resource.node_resource.gpu_type + resource.node_resource.gpu_num = memory or resource.node_resource.gpu_num @property def worker_num(self): @@ -179,7 +209,7 @@ def init_job_resource(self, job_resource: JobResource): pass @abstractmethod - def get_job_resource_plan(self) -> ResourcePlan: + def get_job_resource_plan(self, scale_action: str) -> ResourcePlan: """Get resource plan for a job.""" pass @@ -192,6 +222,11 @@ def adjust_oom_resource(self, node: Node): def get_config_resource(self) -> JobResource: pass + @abstractmethod + def get_brain_resource_plan_opt_gpus(self): + """Get resource plan for a job.""" + pass + class PSJobResourceOptimizer(JobResourceOptimizer): """It generates resource configuration for a PS job.""" @@ -281,6 +316,8 @@ def init_job_resource(self, job_resource: JobResource): self._worker_resource.count, self._worker_resource.node_resource.cpu, self._worker_resource.node_resource.memory, + self._worker_resource.node_resource.gpu_type, + self._worker_resource.node_resource.gpu_num, ) job_resource.update_node_group_resource( @@ -288,6 +325,8 @@ def init_job_resource(self, job_resource: JobResource): self._ps_resource.count, self._ps_resource.node_resource.cpu, self._ps_resource.node_resource.memory, + "", + 0, ) evaluator_group = job_resource.get_node_group_resource( @@ -393,7 +432,7 @@ def _adjust_oom_ps_resource(self, node: Node): ) self._last_ps_change_time = time.time() - def get_job_resource_plan(self): + def get_job_resource_plan(self, scale_action: str): plan = None if self._job_stage == JobOptStage.WORKER_INITIAL: plan = self._get_worker_resource_at_init_phase() @@ -416,6 +455,9 @@ def get_job_resource_plan(self): plan.adjust_plan_by_context() return plan + + def get_brain_resource_plan_opt_gpus(self): + pass def _get_worker_resource_at_running(self): if not self.optimize_worker_sampled: @@ -519,14 +561,25 @@ class AllreduceJobResourceOptimizer(JobResourceOptimizer): def __init__( self, worker_resource: NodeGroupResource, - job_uuid="", + optimize_mode: str, + job_uuid = "", + resource_limits = ResourceLimits(), + job_name = "", + namespace = "", ): self._worker_resource = worker_resource self._original_worker_resource = copy.deepcopy(self._worker_resource) + + if _dlrover_context.elastic_dimension == ElasticDimension.VERTICAL: + self._resource_optimizer = new_allreduce_resource_optimizer( + optimize_mode, job_uuid, resource_limits, job_name, namespace + ) + self._job_uuid = job_uuid self._lock = threading.Lock() self._node_unit = 1 self._alive_node_num = 0 + self._opt_gpu__nums = 0 def update_job_uuid(self, job_uuid): pass @@ -534,7 +587,71 @@ def update_job_uuid(self, job_uuid): def init_job_resource(self, job_resource: JobResource): pass - def get_job_resource_plan(self) -> ResourcePlan: + def get_job_resource_plan(self, scale_action: str) -> ResourcePlan: + if WorkerResourceScaleAction.NOT_ADJUST == scale_action: + return self._get_job_resource_plan_by_original() + + if scale_action == WorkerResourceScaleAction.MANUAL: + optimize_config = self._gen_optimize_config(OptimizerType.MANUAL_OPTIMIZER) + else: + optimize_config = self._gen_optimize_config(OptimizerType.BASE_OPTIMIZER) + + plan = self._resource_optimizer.generate_resource_plan_with_optimizer(optimize_config) + + if not plan or plan.empty(): + logger.warning("Brain resource plan is empty, Use the original plan to start the job") + _dlrover_context.enable_elastic_resource = False + return self._get_job_resource_plan_by_original() + + _dlrover_context.enable_elastic_resource = True + + worker_resources = plan.node_group_resources.get(NodeType.WORKER) + if not worker_resources: + logger.error("Generated plan does not contain WORKER node group. Falling back to original plan.") + _dlrover_context.enable_elastic_resource = False + return self._get_job_resource_plan_by_original() + self._opt_gpu__nums = plan.node_group_resources[NodeType.WORKER].node_resource.gpu_num + self._verify_worker_optimized_group_resource(plan) + + return plan + + def get_worker_resource_config(self): + pass + + def get_brain_resource_plan_opt_gpus(self): + return self._opt_gpu__nums + + def _verify_worker_optimized_group_resource(self, plan: ResourcePlan): + group = plan.node_group_resources[NodeType.WORKER] + group = self._check_ignore_original_worker_resource(group) + return group + + def _check_ignore_original_worker_resource( + self, resource: NodeGroupResource + ): + """Abandon the optimization result if users have set the resource.""" + # Users may worry about that the increasing number of worker hurts the + # accuracy, so the max number of worker is the configuration. + original_resource = self._original_worker_resource.node_resource + if self._original_worker_resource.count > 0: + resource.count = self._original_worker_resource.count + if resource.node_resource.cpu == 0: + resource.node_resource.cpu = original_resource.cpu + if resource.node_resource.memory == 0: + resource.node_resource.memory = original_resource.memory + if resource.node_resource.gpu_num == 0: + resource.node_resource.gpu_num = original_resource.gpu_num + if resource.node_resource.gpu_type is None: + resource.node_resource.gpu_type = original_resource.gpu_type + return resource + + def _gen_optimize_config(self, optimizer="", customized_config={}): + optimize_config: OptimizeConfig = OptimizeConfig() + optimize_config.optimizer = optimizer + optimize_config.customized_config = customized_config + return optimize_config + + def _get_job_resource_plan_by_original(self): """Check whether there are free nodes in the cluster.""" plan = ResourcePlan() worker_config = copy.deepcopy(self._original_worker_resource) diff --git a/dlrover/python/master/resource/optimizer.py b/dlrover/python/master/resource/optimizer.py index 16f70b2d3..577979819 100644 --- a/dlrover/python/master/resource/optimizer.py +++ b/dlrover/python/master/resource/optimizer.py @@ -132,7 +132,7 @@ def new_default_plan(cls): class ResourceOptimizer(metaclass=ABCMeta): - def __init__(self, job_uuid, resource_limits: ResourceLimits): + def __init__(self, job_uuid, resource_limits: ResourceLimits, **kwargs): self._job_uuid = job_uuid self._resource_limits = resource_limits diff --git a/dlrover/python/master/resource/py_brain_optimizer.py b/dlrover/python/master/resource/py_brain_optimizer.py new file mode 100644 index 000000000..c28876601 --- /dev/null +++ b/dlrover/python/master/resource/py_brain_optimizer.py @@ -0,0 +1,152 @@ +# Copyright 2022 The DLRover Authors. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from distutils.log import log +from logging import exception +import math +import uuid +import os +import time + +import requests +from dlrover.brain.python.client.client import BrainClient + +from dlrover.python.common.constants import MemoryUnit, NodeType +from dlrover.python.common.log import default_logger as logger +from dlrover.python.common.node import NodeGroupResource, NodeResource +from dlrover.python.master.resource.optimizer import ( + ResourceOptimizer, + ResourcePlan, +) + +from dlrover.brain.python.common.http_schemas import ( + OptimizeRequest, + OptimizeResponse, + Response, +) +from dlrover.brain.python.common.job import ( + JobMeta, + JobOptimizePlan, +) +from concurrent.futures import ThreadPoolExecutor, TimeoutError + +def catch_brain_optimization_exception(func): + def wrapper(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except Exception as e: + logger.warning("Fail to execute %s by %s", func.__name__, e) + return ResourcePlan() + + return wrapper + + +def convert_plan_msg(plan_msg): + """ + + """ + plan = ResourcePlan() + if not plan_msg: + return plan + for type, group in plan_msg.job_resource.node_group_resources.items(): + if type != NodeType.WORKER: + continue + count = group.count + memory = int(group.resource.memory/MemoryUnit.MB) # MiB + cpu = math.ceil(group.resource.cpu) + gpu_num = int(group.resource.gpu) + gpu_type = group.resource.gpu_type + priority = group.resource.priority + logger.info(f"type: {type}, count: {count}, memory: {memory}Mi, cpu: {cpu}, gpu: {gpu_num}, gpu_type: {gpu_type}, priority: {priority}") + plan.node_group_resources[type] = NodeGroupResource( + count, NodeResource(cpu, memory, gpu_type, gpu_num, priority) + ) + return plan + + +class PyBrainResoureOptimizer(ResourceOptimizer): + """Query resource plan from the brain service. + request: + { + "type": "standard", + "job": { + "uuid": "job-uuid-123", + "cluster": "prod-cluster", + "namespace": "default" + }, + "config": { + "optimizer": "BaseOptimizer", + "customized_config": {"pop_size": "50"} + } + } + """ + + def __init__(self, job_uuid, job_name, cluster, namespace, resource_limits): + super(PyBrainResoureOptimizer, self).__init__(job_uuid, resource_limits) + self.job_name = job_name + self.cluster = cluster + self.namespace = namespace + self.brain_server_address = self.get_brain_server_address() + self._brain_client = BrainClient(self.brain_server_address) + + def get_brain_server_address(self): + """ + Get the brain server address from the environment variable. + example: "http://dlrover-brain.dlrover.svc.cluster.local:50002" + """ + service_name = os.getenv("DLROVER_BRAIN_SERVICE_NAME", "dlrover-brain") + brain_service_port = int(os.getenv("DLROVER_BRAIN_SERVICE_PORT", "50002")) + return "http://%s.%s.svc.cluster.local:%d" % ( + service_name, + self.namespace, + brain_service_port, + ) + + @catch_brain_optimization_exception + def generate_opt_plan(self, stage, config={}) -> ResourcePlan: + pass + + @catch_brain_optimization_exception + def generate_oom_recovery_plan(self, oom_nodes, stage, config={}): + pass + + @catch_brain_optimization_exception + def generate_resource_plan_with_optimizer(self, config): + job = JobMeta(uuid=self._job_uuid, cluster=self.cluster, namespace=self.namespace) + req_data = OptimizeRequest(job=job, config=config) + + logger.info(f"Request brain server with url {self.brain_server_address}") + # interface timeout + timeout_seconds = int(os.getenv("BRAIN_INTERFACE_TIMEOUT_SECONDS", "60")) + + # asyn request brain optimize + with ThreadPoolExecutor(max_workers=3) as executor: + future = executor.submit(self._brain_client.optimize, req_data) + try: + res: OptimizeResponse = future.result(timeout=timeout_seconds) + except TimeoutError: + logger.error("Request to brain server timed out after %d seconds", timeout_seconds) + return ResourcePlan() + + if not res or not res.job_opt_plan: + logger.warning("No any optimization plan") + return ResourcePlan() + + plan_msg = res.job_opt_plan + logger.info( + "The optimization plan with config %s is %s", + config, + plan_msg, + ) + plan = convert_plan_msg(plan_msg) + return plan diff --git a/dlrover/python/master/servicer.py b/dlrover/python/master/servicer.py index f64e85df3..042531cab 100644 --- a/dlrover/python/master/servicer.py +++ b/dlrover/python/master/servicer.py @@ -202,11 +202,23 @@ def get(self, request, _): ) elif isinstance(req_message, comm.HeartBeat): message = self._report_heartbeat(node_type, node_id, req_message) + elif isinstance(req_message, comm.ExecBrainResourcePlanRequest): + message = self.exec_opt_res_plan_ready(req_message) + elif isinstance(req_message, comm.BrainOptGpusRequest): + message = self.exec_opt_res_plan_ready(req_message) if message: response.data = message.serialize() return response + def exec_opt_res_plan_ready(self, req_message): + status: bool = self._job_manager.exec_opt_res_plan_ready() + return comm.ExecBrainResourcePlanReady(status) + + def get_gpus_from_brain_resource_plan(self, req_message): + gpu_nums = self._job_manager.get_gpus_from_brain_resource_plan() + return comm.BrainOptGpus(gpu_nums) + def _get_task(self, node_type, node_id, request: comm.TaskRequest): if not self._start_training_time: self._start_training_time = int(time.time()) @@ -495,10 +507,22 @@ def report(self, request, _): success = self.set_rdzv_blocked(message) elif isinstance(message, comm.DiagnosisAction): success = self._report_action(message) + elif isinstance(message, comm.SaveCheckpointReady): + success = self.set_save_ckpt_status(message) + elif isinstance(message, comm.ParamTunningReady): + success = self.set_param_tunning_ready(message) response.success = success return response + def set_save_ckpt_status(self, message: comm.SaveCheckpointReady): + self._job_manager.set_save_ckpt_status(message.ckpt_save_ready, message.reason) + return True + + def set_param_tunning_ready(self, message: comm.ParamTunningReady): + self._job_manager.set_param_tunning_ready(message.param_tunning_ready) + return True + def set_rdzv_blocked(self, message: comm.RdzvBlocked): rdzv_manager = self._rdzv_managers[RendezvousName.TRAINING] rdzv_manager.set_rdzv_blocked(message.blocked, message.reason) diff --git a/dlrover/python/master/watcher/factory.py b/dlrover/python/master/watcher/factory.py index cf019090d..824fc2e52 100644 --- a/dlrover/python/master/watcher/factory.py +++ b/dlrover/python/master/watcher/factory.py @@ -10,6 +10,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os +from typing import Callable from dlrover.python.common.constants import PlatformType from dlrover.python.common.log import default_logger as logger @@ -57,3 +59,22 @@ def new_elasticjob_watcher(args): return K8sElasticJobWatcher(args) else: logger.info(f"Skip elasticjob watcher for engine {args.platform}") + +def new_configmap_scale_watcher(platform, job_name, namespace, job_uuid, on_update_callback: Callable[[dict], None]): + logger.info("New %s configmap scale Watcher", platform) + if platform in (PlatformType.KUBERNETES, PlatformType.PY_KUBERNETES): + from dlrover.python.master.watcher.k8s_watcher import ( + K8sConfigMapScaleWatcher, + ) + configmap_name = os.getenv( + "DLROVER_MANUAL_SCALE_CONFIGMAP_NAME", "brain-manual-scale-config" + ).lower() + return K8sConfigMapScaleWatcher(job_name, namespace, job_uuid, configmap_name, on_update_callback) + elif platform in (PlatformType.RAY): + from dlrover.python.master.watcher.ray_watcher import ( + RayScalePlanWatcher, + ) + + return RayScalePlanWatcher(job_name, namespace, job_uuid) + else: + raise ValueError("Not support engine %s", platform) \ No newline at end of file diff --git a/dlrover/python/master/watcher/k8s_watcher.py b/dlrover/python/master/watcher/k8s_watcher.py index 443ba165d..703e2eded 100644 --- a/dlrover/python/master/watcher/k8s_watcher.py +++ b/dlrover/python/master/watcher/k8s_watcher.py @@ -15,7 +15,7 @@ import threading from datetime import datetime from time import sleep -from typing import List +from typing import List, Callable from kubernetes import client, watch @@ -512,3 +512,62 @@ def start(self): threading.Thread( target=self.watch, name="job-watcher", daemon=True ).start() + +class K8sConfigMapScaleWatcher: + """ConfigMapScaleWatcher monitors the manual brain configmap on the cluster. + It generates a ResourcePlan by a brain configmap and notidy the + JobManager to adjust job resource by the brain ResourcePlan. + """ + + def __init__(self, job_name, namespace, job_uid, configmap_name, on_update_callback: Callable[[dict], None]): + self._namespace = namespace + self._job_name = job_name + self._job_uid = job_uid + self._configmap_name = configmap_name + self.on_update_callback = on_update_callback + self._k8s_client = k8sClient.singleton_instance(namespace) + + def watch(self): + resource_version = None + + while True: + try: + w = watch.Watch() + v1 = self._k8s_client.client + stream_kwargs = { + "namespace": self._namespace, + "field_selector": f"metadata.name={self._configmap_name}", + "timeout_seconds": 0 # 0 means "listen as long as possible" + } + + if resource_version: + stream_kwargs["resource_version"] = resource_version + + for event in w.stream(v1.list_namespaced_config_map, **stream_kwargs): + obj = event['object'] + event_type = event['type'] + + resource_version = obj.metadata.resource_version + + if event_type == "MODIFIED" or event_type == "ADDED": + if obj.data: + logger.info(f"Got event {event_type} for configmap {obj.metadata.name}, will trigger manual scale.") + self.on_update_callback(obj.data) + sleep(5) + except Exception as e: + logger.warning(f"Watch connection broken ({e}). Retrying in 5 seconds...") + sleep(5) + + def start(self): + logger.info("Starting configmap scale watcher...") + if _dlrover_context.configmap_manual_scale_switch == "off": + logger.warning("Configmap manual scale switch is off, will not watch configmap.") + return + thread = threading.Thread( + target=self.watch, + name="Configmap-scale-watcher", + daemon=True + ) + thread.start() + if thread.is_alive(): + logger.info("ConfigMap scale watcher initialized successfully.") \ No newline at end of file diff --git a/dlrover/python/scheduler/job.py b/dlrover/python/scheduler/job.py index af2662631..bd6fec4ac 100644 --- a/dlrover/python/scheduler/job.py +++ b/dlrover/python/scheduler/job.py @@ -18,6 +18,7 @@ Accelerators, DistributionStrategy, NodeType, + ElasticDimension, ) from dlrover.python.common.node import NodeGroupResource, NodeResource from dlrover.python.common.serialize import JsonSerializable @@ -109,6 +110,9 @@ def __init__(self, platform, namespace, job_name): self.enable_suspended = False self.training_elastic_mode = "base" + self.elastic_dimension = ElasticDimension.HORIZONTAL + self.configmap_manual_scale_switch = "off" + @abstractmethod def initilize(self): pass diff --git a/dlrover/python/scheduler/kubernetes.py b/dlrover/python/scheduler/kubernetes.py index 921ae0c49..2362a459e 100644 --- a/dlrover/python/scheduler/kubernetes.py +++ b/dlrover/python/scheduler/kubernetes.py @@ -85,19 +85,35 @@ def set_container_resource( requests=res_requests.to_resource_dict(), limits=res_limits.to_resource_dict(), ) + logger.info(f"[Set container resource] container resource is none, will use new resource.\n" + f" container limits: {res_limits.to_resource_dict()}\n" + f" container requests: {res_requests.to_resource_dict()}\n" + ) else: res = container.resources if res.requests: res.requests["cpu"] = res_requests.cpu res.requests["memory"] = f"{res_requests.memory}Mi" + res.requests[res_requests.gpu_type] = res_requests.gpu_num else: res.requests = res_requests.to_resource_dict() if res.limits: res.limits["cpu"] = res_limits.cpu res.limits["memory"] = f"{res_limits.memory}Mi" + res.limits[res_limits.gpu_type] = res_limits.gpu_num else: res.limits = res_limits.to_resource_dict() + logger.info(f"[Set container resource] container resource is not none, will adjust limits and requests resource.\n" + f"container limits resource:\n" + f" limits cpu: {res_limits.cpu}\n" + f" limits memory: {res_limits.memory}Mi\n" + f" limits {res_limits.gpu_type}: {res_limits.gpu_num}\n" + f"container requests resource:\n" + f" requests cpu: {res_requests.cpu}\n" + f" requests memory: {res_requests.memory}Mi\n" + f" requests {res_requests.gpu_type}: {res_requests.gpu_num}\n" + ) def retry_k8s_request(func): @@ -466,6 +482,12 @@ def initilize(self): if "nvidia.com" in k: gpu_type = k gpu_num = int(v) + if "huawei.com" in k: + gpu_type = k + gpu_num = int(v) + if "metax-tech.com" in k: + gpu_type = k + gpu_num = int(v) group_resource = NodeGroupResource( num, NodeResource(