diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index 826b67829..43261e53d 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -1,4 +1,7 @@ import asyncio +import copy + +from distributed.core import rpc import aiohttp from contextlib import suppress @@ -15,36 +18,59 @@ ) -def build_scheduler_pod_spec(name, spec): - return { +def build_scheduler_pod_spec(cluster_name, spec): + scheduler_name = f"{cluster_name}-scheduler" + pod_spec = { "apiVersion": "v1", "kind": "Pod", "metadata": { - "name": f"{name}-scheduler", + "name": scheduler_name, "labels": { - "dask.org/cluster-name": name, + "dask.org/cluster-name": cluster_name, "dask.org/component": "scheduler", - "sidecar.istio.io/inject": "false", + "app": "scheduler", + "version": "v1", }, }, "spec": spec, } + pod_spec["spec"]["serviceAccountName"] = f"{scheduler_name}-service" + + return pod_spec -def build_scheduler_service_spec(name, spec): + +def build_scheduler_service_spec(cluster_name, spec): return { "apiVersion": "v1", "kind": "Service", "metadata": { - "name": f"{name}-service", + "name": f"{cluster_name}-scheduler-service", "labels": { - "dask.org/cluster-name": name, + "dask.org/cluster-name": cluster_name, + "app": "scheduler", + "service": "scheduler", }, }, "spec": spec, } +def build_scheduler_service_account_spec(cluster_name): + scheduler_service_name = f"{cluster_name}-scheduler-service" + return { + "apiVersion": "v1", + "kind": "ServiceAccount", + "metadata": { + "name": scheduler_service_name, + "labels": { + "dask.org/cluster-name": cluster_name, + "account": scheduler_service_name, + }, + }, + } + + def build_worker_pod_spec(worker_group_name, namespace, cluster_name, uuid, spec): worker_name = f"{worker_group_name}-worker-{uuid}" pod_spec = { @@ -56,10 +82,10 @@ def build_worker_pod_spec(worker_group_name, namespace, cluster_name, uuid, spec "dask.org/cluster-name": cluster_name, "dask.org/workergroup-name": worker_group_name, "dask.org/component": "worker", - "sidecar.istio.io/inject": "false", + "dask.org/worker-name": worker_name, }, }, - "spec": spec, + "spec": copy.copy(spec), } env = [ { @@ -119,6 +145,54 @@ def build_worker_group_spec(name, spec): } +def build_worker_service_spec(cluster_name, worker_name): + return { + "apiVersion": "v1", + "kind": "Service", + "metadata": { + "name": f"{worker_name}-service", + "labels": { + "dask.org/cluster-name": cluster_name, + }, + }, + "spec": { + "type": "ClusterIP", + "selector": { + "dask.org/cluster-name": cluster_name, + "dask.org/worker-name": worker_name, + }, + "ports": [ + { + "name": "tcp-comm", + "protocol": "TCP", + "port": 8788, + "targetPort": "tcp-comm", + }, + { + "name": "http-dashboard", + "protocol": "TCP", + "port": 8787, + "targetPort": "http-dashboard", + }, + ], + }, + } + + +def build_worker_service_account_spec(cluster_name, worker_name): + return { + "apiVersion": "v1", + "kind": "ServiceAccount", + "metadata": { + "name": f"{worker_name}-service", + "labels": { + "dask.org/cluster-name": cluster_name, + "account": f"{worker_name}-service", + }, + }, + } + + def build_cluster_spec(name, worker_spec, scheduler_spec): return { "apiVersion": "kubernetes.dask.org/v1", @@ -151,6 +225,13 @@ async def daskcluster_create(spec, name, namespace, logger, **kwargs): async with kubernetes.client.api_client.ApiClient() as api_client: api = kubernetes.client.CoreV1Api(api_client) + scheduler_service_account_spec = build_scheduler_service_account_spec(name) + kopf.adopt(scheduler_service_account_spec) + await api.create_namespaced_service_account( + namespace=namespace, + body=scheduler_service_account_spec, + ) + # TODO Check for existing scheduler pod scheduler_spec = spec.get("scheduler", {}) data = build_scheduler_pod_spec(name, scheduler_spec.get("spec")) @@ -288,6 +369,24 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): if workers_needed > 0: for _ in range(workers_needed): + worker_name = f"{name}-worker-{uuid4().hex[:10]}" + + worker_service_account_spec = build_worker_service_account_spec( + spec["cluster"], worker_name + ) + kopf.adopt(worker_service_account_spec) + await api.create_namespaced_service_account( + namespace=namespace, + body=worker_service_account_spec, + ) + + data = build_worker_service_spec(spec["cluster"], worker_name) + kopf.adopt(data) + await api.create_namespaced_service( + namespace=namespace, + body=data, + ) + await wait_for_service(api, data["metadata"]["name"], namespace) data = build_worker_pod_spec( worker_group_name=name, namespace=namespace, diff --git a/dask_kubernetes/operator/tests/resources/simplecluster.yaml b/dask_kubernetes/operator/tests/resources/simplecluster.yaml index ee52c0eab..8ff90b961 100644 --- a/dask_kubernetes/operator/tests/resources/simplecluster.yaml +++ b/dask_kubernetes/operator/tests/resources/simplecluster.yaml @@ -13,8 +13,23 @@ spec: imagePullPolicy: "IfNotPresent" args: - dask-worker + - tcp://simple-cluster-scheduler-service.default.svc.cluster.local:8786 + - --no-nanny - --name - $(DASK_WORKER_NAME) + - --dashboard-address + - 0.0.0.0:8787 + - --listen-address + - tcp://0.0.0.0:8788 + - --contact-address + - tcp://$(DASK_WORKER_NAME)-service.default.svc.cluster.local:8788 + ports: + - name: tcp-comm + containerPort: 8788 + protocol: TCP + - name: http-dashboard + containerPort: 8787 + protocol: TCP env: - name: WORKER_ENV value: hello-world # We dont test the value, just the name diff --git a/dask_kubernetes/operator/tests/test_operator.py b/dask_kubernetes/operator/tests/test_operator.py index f4d602ed5..be28c80f5 100644 --- a/dask_kubernetes/operator/tests/test_operator.py +++ b/dask_kubernetes/operator/tests/test_operator.py @@ -112,7 +112,7 @@ async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster): await client.wait_for_workers(3) -@pytest.mark.timeout(180) +@pytest.mark.timeout(300) @pytest.mark.asyncio async def test_simplecluster(k8s_cluster, kopf_runner, gen_cluster): with kopf_runner as runner: