diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index e82f76c86..8e297ff85 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -87,6 +87,14 @@ def build_scheduler_service_spec(cluster_name, spec, annotations, labels): "dask.org/component": "scheduler", } ) + + if spec.get("type") == "NodePort": + try: + node_port = spec["ports"]["nodePort"] + assert node_port in range(30000, 32768) + except ValueError as e: + raise ValueError(f"invalid port '{node_port}' out of range") from e + return { "apiVersion": "v1", "kind": "Service", diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index de8c8a07a..0c7b7b02b 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -230,6 +230,22 @@ def __init__( if isinstance(self.worker_command, str): self.worker_command = self.worker_command.split(" ") + if self.n_workers is not None and not isinstance(self.n_workers, int): + raise TypeError(f"n_workers must be an integer, got {type(self.n_workers)}") + + try: + # Validate `resources` param is a dictionary whose + # keys must either be 'limits' or 'requests' + assert isinstance(self.resources, dict) + + for field in self.resources: + if field in ("limits", "requests"): + assert isinstance(self.resources[field], dict) + else: + raise ValueError(f"Unknown field '{field}' in resources") + except TypeError as e: + raise TypeError(f"invalid '{type(resources)}' for resources type") from e + name = name.format( user=getpass.getuser(), uuid=str(uuid.uuid4())[:10], **os.environ ) diff --git a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py index 8ba788e59..3a5668092 100644 --- a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py @@ -153,3 +153,22 @@ def test_custom_spec(kopf_runner, docker_image): with KubeCluster(custom_cluster_spec=spec, n_workers=1) as cluster: with Client(cluster) as client: assert client.submit(lambda x: x + 1, 10).result() == 11 + + +def test_for_noninteger_n_workers(kopf_runner): + with kopf_runner: + with pytest.raises(TypeError, match="n_workers must be an integer"): + KubeCluster(name="foo", n_workers="1") + + +def test_typo_resource_limits(kopf_runner): + with kopf_runner: + with pytest.raises(ValueError): + KubeCluster( + name="foo", + resources={ + "limit": { # <-- Typo, should be `limits` + "CPU": "1", + }, + }, + )