diff --git a/server/src/services/k8s/agent_sandbox_provider.py b/server/src/services/k8s/agent_sandbox_provider.py index e7636be99..4834536b9 100644 --- a/server/src/services/k8s/agent_sandbox_provider.py +++ b/server/src/services/k8s/agent_sandbox_provider.py @@ -37,6 +37,9 @@ from src.services.k8s.egress_helper import ( apply_egress_to_spec, build_security_context_for_sandbox_container, + prep_execd_init_for_egress, +) +from src.services.k8s.security_context import ( build_security_context_from_dict, serialize_security_context_to_dict, ) @@ -222,7 +225,10 @@ def _build_pod_spec( egress_mode: str = EGRESS_MODE_DNS, ) -> Dict[str, Any]: """Build pod spec dict for the Sandbox CRD.""" - init_container = self._build_execd_init_container(execd_image) + disable_ipv6_for_egress = network_policy is not None and egress_image is not None + init_container = self._build_execd_init_container( + execd_image, disable_ipv6_for_egress=disable_ipv6_for_egress + ) main_container = self._build_main_container( image_spec=image_spec, entrypoint=entrypoint, @@ -252,7 +258,6 @@ def _build_pod_spec( # Add egress sidecar if network policy is provided apply_egress_to_spec( - pod_spec=pod_spec, containers=containers, network_policy=network_policy, egress_image=egress_image, @@ -262,7 +267,12 @@ def _build_pod_spec( return pod_spec - def _build_execd_init_container(self, execd_image: str) -> V1Container: + def _build_execd_init_container( + self, + execd_image: str, + *, + disable_ipv6_for_egress: bool = False, + ) -> V1Container: """Build init container that copies execd binary to the shared volume.""" script = ( "cp ./execd /opt/opensandbox/bin/execd && " @@ -270,6 +280,10 @@ def _build_execd_init_container(self, execd_image: str) -> V1Container: "chmod +x /opt/opensandbox/bin/execd && " "chmod +x /opt/opensandbox/bin/bootstrap.sh" ) + security_context = None + if disable_ipv6_for_egress: + script, sc_dict = prep_execd_init_for_egress(script) + security_context = build_security_context_from_dict(sc_dict) resources = None if self.execd_init_resources: @@ -290,6 +304,7 @@ def _build_execd_init_container(self, execd_image: str) -> V1Container: ) ], resources=resources, + security_context=security_context, ) def _build_main_container( diff --git a/server/src/services/k8s/batchsandbox_provider.py b/server/src/services/k8s/batchsandbox_provider.py index 942e7b8ad..b9571b987 100644 --- a/server/src/services/k8s/batchsandbox_provider.py +++ b/server/src/services/k8s/batchsandbox_provider.py @@ -41,6 +41,9 @@ from src.services.k8s.egress_helper import ( apply_egress_to_spec, build_security_context_for_sandbox_container, + prep_execd_init_for_egress, +) +from src.services.k8s.security_context import ( build_security_context_from_dict, serialize_security_context_to_dict, ) @@ -181,7 +184,10 @@ def create_workload( extra_volumes, extra_mounts = self._extract_template_pod_extras() # Build init container for execd installation - init_container = self._build_execd_init_container(execd_image) + disable_ipv6_for_egress = network_policy is not None and egress_image is not None + init_container = self._build_execd_init_container( + execd_image, disable_ipv6_for_egress=disable_ipv6_for_egress + ) # Build main container with execd support main_container = self._build_main_container( @@ -219,7 +225,6 @@ def create_workload( # Add egress sidecar if network policy is provided apply_egress_to_spec( - pod_spec=pod_spec, containers=containers, network_policy=network_policy, egress_image=egress_image, @@ -490,7 +495,12 @@ def _build_task_template( } } - def _build_execd_init_container(self, execd_image: str) -> V1Container: + def _build_execd_init_container( + self, + execd_image: str, + *, + disable_ipv6_for_egress: bool = False, + ) -> V1Container: """ Build init container for execd installation. @@ -503,6 +513,8 @@ def _build_execd_init_container(self, execd_image: str) -> V1Container: Args: execd_image: execd container image + disable_ipv6_for_egress: When True, disable IPv6 in the Pod netns first + (privileged) then install binaries; used with egress sidecar. Returns: V1Container: Init container spec @@ -514,6 +526,10 @@ def _build_execd_init_container(self, execd_image: str) -> V1Container: "chmod +x /opt/opensandbox/bin/execd && " "chmod +x /opt/opensandbox/bin/bootstrap.sh" ) + security_context = None + if disable_ipv6_for_egress: + script, sc_dict = prep_execd_init_for_egress(script) + security_context = build_security_context_from_dict(sc_dict) resources = None if self.execd_init_resources: @@ -534,6 +550,7 @@ def _build_execd_init_container(self, execd_image: str) -> V1Container: ) ], resources=resources, + security_context=security_context, ) def _build_main_container( diff --git a/server/src/services/k8s/egress_helper.py b/server/src/services/k8s/egress_helper.py index f79bf18f1..8f958514b 100644 --- a/server/src/services/k8s/egress_helper.py +++ b/server/src/services/k8s/egress_helper.py @@ -13,14 +13,14 @@ # limitations under the License. """ -Egress sidecar helper functions for Kubernetes workloads. +Egress sidecar helpers for Kubernetes pod specs. -This module provides shared utilities for building egress sidecar containers -and related configurations that can be reused across different workload providers. +Public entry points: ``prep_execd_init_for_egress``, ``build_security_context_for_sandbox_container``, +``apply_egress_to_spec``. SecurityContext dict ↔ V1 conversion lives in ``security_context``. """ import json -from typing import Dict, Any, List, Optional +from typing import Any, Dict, List, Optional from src.api.schema import NetworkPolicy from src.config import EGRESS_MODE_DNS @@ -30,127 +30,38 @@ OPENSANDBOX_EGRESS_TOKEN, ) -# Privileged sidecar: IPv6 disable is applied at container start (see EGRESS_K8S_START_COMMAND), not via Pod sysctls. -EGRESS_K8S_START_COMMAND = [ - "/bin/sh", - "-c", - "sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1 && /egress", -] - -def build_egress_sidecar_container( - egress_image: str, - network_policy: NetworkPolicy, - egress_auth_token: Optional[str] = None, - egress_mode: str = EGRESS_MODE_DNS, -) -> Dict[str, Any]: +def prep_execd_init_for_egress(exec_install_script: str) -> tuple[str, Dict[str, Any]]: """ - Build egress sidecar container specification for Kubernetes Pod. - - This function creates a container spec that can be added to a Pod's containers - list. The sidecar container will: - - Run **privileged** with a startup command that runs ``sysctl`` then ``/egress`` - - Receive network policy via OPENSANDBOX_EGRESS_RULES environment variable + Prepare execd init when an egress sidecar is used: disable IPv6 in the Pod netns, then install. - Note: In Kubernetes, containers in the same Pod share the network namespace, - so the main container can access the sidecar's ports (44772 for execd, 8080 for HTTP) - via localhost without explicit port declarations. - - IPv6 for ``all`` interfaces is disabled inside the sidecar start script; Pod-level - ``net.ipv6.conf.all.disable_ipv6`` sysctl injection is not used. - - Args: - egress_image: Container image for the egress sidecar - network_policy: Network policy configuration to enforce - egress_auth_token: Optional bearer token for egress HTTP API - egress_mode: ``dns`` or ``dns+nft`` (``OPENSANDBOX_EGRESS_MODE``) + Writes ``/proc/sys/.../disable_ipv6`` (no ``sysctl`` binary required). The returned + security context dict must be applied to the execd init container (typically via + ``build_security_context_from_dict`` in ``security_context``). Returns: - Dict containing container specification compatible with Kubernetes Pod spec. - This dict can be directly added to the Pod's containers list. - - Example: - ```python - sidecar = build_egress_sidecar_container( - egress_image="opensandbox/egress:v1.0.3", - network_policy=NetworkPolicy( - default_action="deny", - egress=[NetworkRule(action="allow", target="pypi.org")] - ) - ) - pod_spec["containers"].append(sidecar) - ``` + ``(prefixed_shell_script, {"privileged": True})`` """ - # Serialize network policy to JSON for environment variable - policy_payload = json.dumps( - network_policy.model_dump(by_alias=True, exclude_none=True) + script = ( + "set -e; " + "echo 1 > /proc/sys/net/ipv6/conf/all/disable_ipv6 && " + f"{exec_install_script}" ) - - env = [ - { - "name": EGRESS_RULES_ENV, - "value": policy_payload, - }, - { - "name": EGRESS_MODE_ENV, - "value": egress_mode, - }, - ] - if egress_auth_token: - env.append( - { - "name": OPENSANDBOX_EGRESS_TOKEN, - "value": egress_auth_token, - } - ) - - # Build container specification - container_spec: Dict[str, Any] = { - "name": "egress", - "image": egress_image, - "command": EGRESS_K8S_START_COMMAND, - "env": env, - "securityContext": _build_security_context_for_egress(), - } - - return container_spec - - -def _build_security_context_for_egress() -> Dict[str, Any]: - """ - Build security context for egress sidecar container. - - The sidecar runs **privileged** so it can manage iptables/nft and run sysctl - inside the container network namespace at startup. - - Returns: - Dict containing security context with ``privileged: true``. - """ - return { - "privileged": True, - } + return script, {"privileged": True} def build_security_context_for_sandbox_container( has_network_policy: bool, ) -> Dict[str, Any]: """ - Build security context for main sandbox container. - - When network policy is enabled, the main container should drop NET_ADMIN - capability to prevent it from modifying network configuration. Only the - egress sidecar should have NET_ADMIN. - - Args: - has_network_policy: Whether network policy is enabled for this sandbox - - Returns: - Dict containing security context configuration. If has_network_policy is True, - includes NET_ADMIN in the drop list. Otherwise, returns empty dict. + Security context dict for the main sandbox container. + + When network policy is enabled, drops ``NET_ADMIN`` so only the egress sidecar can + mutate network stack state. """ if not has_network_policy: return {} - + return { "capabilities": { "drop": ["NET_ADMIN"], @@ -159,7 +70,6 @@ def build_security_context_for_sandbox_container( def apply_egress_to_spec( - pod_spec: Dict[str, Any], containers: List[Dict[str, Any]], network_policy: Optional[NetworkPolicy], egress_image: Optional[str], @@ -167,144 +77,30 @@ def apply_egress_to_spec( egress_mode: str = EGRESS_MODE_DNS, ) -> None: """ - Apply egress sidecar configuration to Pod spec. - - This function adds the egress sidecar container to the containers list. - It does **not** mutate Pod ``securityContext.sysctls`` for IPv6; the sidecar - disables ``net.ipv6.conf.all.disable_ipv6`` via its startup command. - - Args: - pod_spec: Pod specification dict (will be modified in place) - containers: List of container dicts (will be modified in place) - network_policy: Optional network policy configuration - egress_image: Optional egress sidecar image - egress_mode: ``dns`` or ``dns+nft`` (default ``EGRESS_MODE_DNS``). - - Example: - ```python - containers = [main_container_dict] - pod_spec = {"containers": containers, ...} - - apply_egress_to_spec( - pod_spec=pod_spec, - containers=containers, - network_policy=network_policy, - egress_image=egress_image, - ) - ``` - + Append the egress sidecar to ``containers``. IPv6 is handled in execd init + (``prep_execd_init_for_egress``); Pod-level sysctls are not modified. """ if not network_policy or not egress_image: return - # Build and add egress sidecar container - sidecar_container = build_egress_sidecar_container( - egress_image=egress_image, - network_policy=network_policy, - egress_auth_token=egress_auth_token, - egress_mode=egress_mode, - ) - containers.append(sidecar_container) - - -def build_security_context_from_dict( - security_context_dict: Dict[str, Any], -) -> Optional[Any]: - """ - Convert security context dict to V1SecurityContext object. - - This is a helper function to convert the dict returned by - build_security_context_for_sandbox_container() into a Kubernetes - V1SecurityContext object that can be used in V1Container. - - Args: - security_context_dict: Security context configuration dict - - Returns: - V1SecurityContext object or None if dict is empty - - Example: - ```python - from kubernetes.client import V1Container - - security_context_dict = build_security_context_for_sandbox_container(True) - security_context = build_security_context_from_dict(security_context_dict) - - container = V1Container( - name="sandbox", - security_context=security_context, - ) - ``` - """ - if not security_context_dict: - return None - - from kubernetes.client import V1SecurityContext, V1Capabilities - - capabilities = None - if "capabilities" in security_context_dict: - caps_dict = security_context_dict["capabilities"] - add_caps = caps_dict.get("add", []) - drop_caps = caps_dict.get("drop", []) - capabilities = V1Capabilities( - add=add_caps if add_caps else None, - drop=drop_caps if drop_caps else None, - ) - - privileged = security_context_dict.get("privileged") - - if capabilities is None and privileged is None: - return None - - return V1SecurityContext( - capabilities=capabilities, - privileged=privileged, + policy_payload = json.dumps( + network_policy.model_dump(by_alias=True, exclude_none=True) ) + env: List[Dict[str, str]] = [ + {"name": EGRESS_RULES_ENV, "value": policy_payload}, + {"name": EGRESS_MODE_ENV, "value": egress_mode}, + ] + if egress_auth_token: + env.append({"name": OPENSANDBOX_EGRESS_TOKEN, "value": egress_auth_token}) -def serialize_security_context_to_dict( - security_context: Optional[Any], -) -> Optional[Dict[str, Any]]: - """ - Serialize V1SecurityContext to dict format for CRD. - - This function converts a V1SecurityContext object (from V1Container) - into a dict format that can be used in Kubernetes CRD specifications. - - Args: - security_context: V1SecurityContext object or None - - Returns: - Dict representation of security context or None - - Example: - ```python - container_dict = { - "name": container.name, - "image": container.image, + containers.append( + { + "name": "egress", + "image": egress_image, + "env": env, + "securityContext": { + "capabilities": {"add": ["NET_ADMIN"]}, + }, } - - if container.security_context: - container_dict["securityContext"] = serialize_security_context_to_dict( - container.security_context - ) - ``` - """ - if not security_context: - return None - - result: Dict[str, Any] = {} - - if security_context.capabilities: - caps: Dict[str, Any] = {} - if security_context.capabilities.add: - caps["add"] = security_context.capabilities.add - if security_context.capabilities.drop: - caps["drop"] = security_context.capabilities.drop - if caps: - result["capabilities"] = caps - - if security_context.privileged is not None: - result["privileged"] = security_context.privileged - - return result if result else None + ) diff --git a/server/src/services/k8s/security_context.py b/server/src/services/k8s/security_context.py new file mode 100644 index 000000000..5412653a9 --- /dev/null +++ b/server/src/services/k8s/security_context.py @@ -0,0 +1,75 @@ +# Copyright 2026 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Kubernetes V1SecurityContext ↔ plain dict helpers for CRD pod specs.""" + +from typing import Any, Dict, Optional + + +def build_security_context_from_dict( + security_context_dict: Dict[str, Any], +) -> Optional[Any]: + """ + Convert a security context dict to ``V1SecurityContext``. + + Empty dict returns None. + """ + if not security_context_dict: + return None + + from kubernetes.client import V1SecurityContext, V1Capabilities + + capabilities = None + if "capabilities" in security_context_dict: + caps_dict = security_context_dict["capabilities"] + add_caps = caps_dict.get("add", []) + drop_caps = caps_dict.get("drop", []) + capabilities = V1Capabilities( + add=add_caps if add_caps else None, + drop=drop_caps if drop_caps else None, + ) + + privileged = security_context_dict.get("privileged") + + if capabilities is None and privileged is None: + return None + + return V1SecurityContext( + capabilities=capabilities, + privileged=privileged, + ) + + +def serialize_security_context_to_dict( + security_context: Optional[Any], +) -> Optional[Dict[str, Any]]: + """Serialize ``V1SecurityContext`` to a CRD-friendly dict.""" + if not security_context: + return None + + result: Dict[str, Any] = {} + + if security_context.capabilities: + caps: Dict[str, Any] = {} + if security_context.capabilities.add: + caps["add"] = security_context.capabilities.add + if security_context.capabilities.drop: + caps["drop"] = security_context.capabilities.drop + if caps: + result["capabilities"] = caps + + if security_context.privileged is not None: + result["privileged"] = security_context.privileged + + return result if result else None diff --git a/server/tests/k8s/test_agent_sandbox_provider.py b/server/tests/k8s/test_agent_sandbox_provider.py index 425d1385d..29ed21c97 100644 --- a/server/tests/k8s/test_agent_sandbox_provider.py +++ b/server/tests/k8s/test_agent_sandbox_provider.py @@ -540,8 +540,18 @@ def test_create_workload_with_network_policy_adds_sidecar(self, mock_k8s_client) assert "OPENSANDBOX_EGRESS_RULES" in env_vars assert env_vars["OPENSANDBOX_EGRESS_MODE"] == EGRESS_MODE_DNS - assert sidecar.get("securityContext", {}).get("privileged") is True - assert "net.ipv6.conf.all.disable_ipv6=1" in sidecar["command"][2] + caps = sidecar.get("securityContext", {}).get("capabilities", {}) + assert "NET_ADMIN" in caps.get("add", []) + assert sidecar.get("securityContext", {}).get("privileged") is not True + assert "command" not in sidecar + + inits = pod_spec.get("initContainers", []) + assert len(inits) == 1 + execd_init = inits[0] + assert execd_init["name"] == "execd-installer" + assert execd_init["image"] == "execd:latest" + assert execd_init.get("securityContext", {}).get("privileged") is True + assert "/proc/sys/net/ipv6/conf/all/disable_ipv6" in execd_init["args"][0] def test_create_workload_with_network_policy_persists_annotation_and_sidecar_token(self, mock_k8s_client): provider = AgentSandboxProvider(mock_k8s_client) @@ -635,7 +645,10 @@ def test_create_workload_with_network_policy_does_not_add_pod_ipv6_sysctls(self, assert "securityContext" not in pod_spec or "sysctls" not in pod_spec.get("securityContext", {}) sidecar = next(c for c in pod_spec["containers"] if c["name"] == "egress") - assert sidecar["command"][2].startswith("sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1") + assert "command" not in sidecar + execd_init = pod_spec["initContainers"][0] + assert execd_init["name"] == "execd-installer" + assert "/proc/sys/net/ipv6/conf/all/disable_ipv6" in execd_init["args"][0] def test_create_workload_with_network_policy_drops_net_admin_from_main_container(self, mock_k8s_client): """ diff --git a/server/tests/k8s/test_batchsandbox_provider.py b/server/tests/k8s/test_batchsandbox_provider.py index ddc66b4af..4413cef2d 100644 --- a/server/tests/k8s/test_batchsandbox_provider.py +++ b/server/tests/k8s/test_batchsandbox_provider.py @@ -1245,10 +1245,18 @@ def test_create_workload_with_network_policy_adds_sidecar(self, mock_k8s_client) assert "OPENSANDBOX_EGRESS_RULES" in env_vars assert env_vars["OPENSANDBOX_EGRESS_MODE"] == EGRESS_MODE_DNS - # Egress sidecar: privileged + sysctl + /egress startup (Kubernetes) - assert sidecar.get("securityContext", {}).get("privileged") is True - assert sidecar.get("command") is not None - assert "net.ipv6.conf.all.disable_ipv6=1" in sidecar["command"][2] + caps = sidecar.get("securityContext", {}).get("capabilities", {}) + assert "NET_ADMIN" in caps.get("add", []) + assert sidecar.get("securityContext", {}).get("privileged") is not True + assert "command" not in sidecar + + inits = pod_spec.get("initContainers", []) + assert len(inits) == 1 + execd_init = inits[0] + assert execd_init["name"] == "execd-installer" + assert execd_init["image"] == "execd:latest" + assert execd_init.get("securityContext", {}).get("privileged") is True + assert "/proc/sys/net/ipv6/conf/all/disable_ipv6" in execd_init["args"][0] def test_create_workload_with_network_policy_persists_annotation_and_sidecar_token(self, mock_k8s_client): provider = BatchSandboxProvider(mock_k8s_client) @@ -1311,7 +1319,7 @@ def test_create_workload_with_egress_mode_dns_nft(self, mock_k8s_client): assert env_vars["OPENSANDBOX_EGRESS_MODE"] == EGRESS_MODE_DNS_NFT def test_create_workload_with_network_policy_does_not_add_pod_ipv6_sysctls(self, mock_k8s_client): - """IPv6 all.disable is applied in egress sidecar start command, not Pod sysctls.""" + """IPv6 all.disable is applied in privileged execd init, not Pod sysctls.""" provider = BatchSandboxProvider(mock_k8s_client) mock_k8s_client.create_custom_object.return_value = { "metadata": {"name": "test-id", "uid": "test-uid"} @@ -1343,7 +1351,10 @@ def test_create_workload_with_network_policy_does_not_add_pod_ipv6_sysctls(self, assert "securityContext" not in pod_spec or "sysctls" not in pod_spec.get("securityContext", {}) sidecar = next(c for c in pod_spec["containers"] if c["name"] == "egress") - assert sidecar["command"][2].startswith("sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1") + assert "command" not in sidecar + execd_init = pod_spec["initContainers"][0] + assert execd_init["name"] == "execd-installer" + assert "/proc/sys/net/ipv6/conf/all/disable_ipv6" in execd_init["args"][0] def test_create_workload_with_network_policy_drops_net_admin_from_main_container(self, mock_k8s_client): """ diff --git a/server/tests/k8s/test_egress_helper.py b/server/tests/k8s/test_egress_helper.py index 00d29be4c..47bcc6081 100644 --- a/server/tests/k8s/test_egress_helper.py +++ b/server/tests/k8s/test_egress_helper.py @@ -17,20 +17,39 @@ """ import json +from typing import Optional from src.api.schema import NetworkPolicy, NetworkRule from src.config import EGRESS_MODE_DNS, EGRESS_MODE_DNS_NFT from src.services.constants import EGRESS_MODE_ENV, EGRESS_RULES_ENV, OPENSANDBOX_EGRESS_TOKEN from src.services.k8s.egress_helper import ( - EGRESS_K8S_START_COMMAND, apply_egress_to_spec, - build_egress_sidecar_container, build_security_context_for_sandbox_container, + prep_execd_init_for_egress, ) -class TestBuildEgressSidecarContainer: - """Tests for build_egress_sidecar_container function.""" +def _egress_container( + egress_image: str, + network_policy: NetworkPolicy, + *, + egress_auth_token: Optional[str] = None, + egress_mode: str = EGRESS_MODE_DNS, +) -> dict: + """Sidecar dict produced by ``apply_egress_to_spec``.""" + containers: list = [] + apply_egress_to_spec( + containers, + network_policy, + egress_image, + egress_auth_token=egress_auth_token, + egress_mode=egress_mode, + ) + return containers[0] + + +class TestEgressSidecarViaApply: + """Egress sidecar shape (via ``apply_egress_to_spec``).""" def test_builds_container_with_basic_config(self): """Test that container is built with correct basic configuration.""" @@ -42,7 +61,7 @@ def test_builds_container_with_basic_config(self): ], ) - container = build_egress_sidecar_container(egress_image, network_policy) + container = _egress_container(egress_image, network_policy) assert container["name"] == "egress" assert container["image"] == egress_image @@ -57,7 +76,7 @@ def test_contains_egress_rules_environment_variable(self): egress=[NetworkRule(action="allow", target="example.com")], ) - container = build_egress_sidecar_container(egress_image, network_policy) + container = _egress_container(egress_image, network_policy) env_vars = container["env"] assert len(env_vars) == 2 @@ -73,7 +92,7 @@ def test_contains_egress_token_when_provided(self): egress=[NetworkRule(action="allow", target="example.com")], ) - container = build_egress_sidecar_container( + container = _egress_container( egress_image, network_policy, egress_auth_token="egress-token", @@ -90,7 +109,7 @@ def test_egress_mode_dns_nft(self): egress=[NetworkRule(action="allow", target="example.com")], ) - container = build_egress_sidecar_container( + container = _egress_container( egress_image, network_policy, egress_mode=EGRESS_MODE_DNS_NFT, @@ -110,14 +129,12 @@ def test_serializes_network_policy_correctly(self): ], ) - container = build_egress_sidecar_container(egress_image, network_policy) + container = _egress_container(egress_image, network_policy) env_value = container["env"][0]["value"] - # Should be valid JSON policy_dict = json.loads(env_value) - - # Verify structure - assert "defaultAction" in policy_dict # by_alias=True converts default_action + + assert "defaultAction" in policy_dict assert policy_dict["defaultAction"] == "deny" assert "egress" in policy_dict assert len(policy_dict["egress"]) == 2 @@ -134,11 +151,11 @@ def test_handles_empty_egress_rules(self): egress=[], ) - container = build_egress_sidecar_container(egress_image, network_policy) + container = _egress_container(egress_image, network_policy) env_value = container["env"][0]["value"] policy_dict = json.loads(env_value) - + assert policy_dict["defaultAction"] == "allow" assert policy_dict["egress"] == [] @@ -149,36 +166,34 @@ def test_handles_missing_default_action(self): egress=[NetworkRule(action="allow", target="example.com")], ) - container = build_egress_sidecar_container(egress_image, network_policy) + container = _egress_container(egress_image, network_policy) env_value = container["env"][0]["value"] policy_dict = json.loads(env_value) - - # defaultAction should be excluded if None (exclude_none=True) + assert "defaultAction" not in policy_dict or policy_dict.get("defaultAction") is None assert "egress" in policy_dict - def test_security_context_is_privileged(self): - """Egress sidecar runs privileged (Kubernetes).""" + def test_security_context_adds_net_admin_not_privileged(self): + """Egress sidecar uses NET_ADMIN only (IPv6 is disabled in execd init when egress is on).""" egress_image = "opensandbox/egress:v1.0.3" network_policy = NetworkPolicy( default_action="deny", egress=[], ) - container = build_egress_sidecar_container(egress_image, network_policy) + container = _egress_container(egress_image, network_policy) security_context = container["securityContext"] - assert security_context.get("privileged") is True + assert security_context.get("privileged") is not True + assert "NET_ADMIN" in security_context.get("capabilities", {}).get("add", []) - def test_start_command_runs_sysctl_then_egress(self): - container = build_egress_sidecar_container( + def test_no_command_uses_image_entrypoint(self): + container = _egress_container( "opensandbox/egress:v1.0.3", NetworkPolicy(default_action="deny", egress=[]), ) - assert container["command"] == EGRESS_K8S_START_COMMAND - assert "net.ipv6.conf.all.disable_ipv6=1" in container["command"][2] - assert container["command"][2].endswith("&& /egress") + assert "command" not in container def test_container_spec_is_valid_kubernetes_format(self): """Test that returned container spec is in valid Kubernetes format.""" @@ -188,20 +203,18 @@ def test_container_spec_is_valid_kubernetes_format(self): egress=[NetworkRule(action="allow", target="example.com")], ) - container = build_egress_sidecar_container(egress_image, network_policy) + container = _egress_container(egress_image, network_policy) - # Verify all required fields are present assert "name" in container assert "image" in container assert "env" in container assert "securityContext" in container - - # Verify env is a list of dicts with name/value + assert isinstance(container["env"], list) assert len(container["env"]) > 0 assert "name" in container["env"][0] assert "value" in container["env"][0] - assert "command" in container + assert "command" not in container def test_handles_wildcard_domains(self): """Test that wildcard domains in egress rules are handled correctly.""" @@ -214,11 +227,11 @@ def test_handles_wildcard_domains(self): ], ) - container = build_egress_sidecar_container(egress_image, network_policy) + container = _egress_container(egress_image, network_policy) env_value = container["env"][0]["value"] policy_dict = json.loads(env_value) - + assert len(policy_dict["egress"]) == 2 assert policy_dict["egress"][0]["target"] == "*.python.org" assert policy_dict["egress"][1]["target"] == "pypi.org" @@ -235,7 +248,7 @@ def test_returns_empty_dict_when_no_network_policy(self): def test_drops_net_admin_when_network_policy_enabled(self): """Test that NET_ADMIN is dropped when network policy is enabled.""" result = build_security_context_for_sandbox_container(has_network_policy=True) - + assert "capabilities" in result assert "drop" in result["capabilities"] assert "NET_ADMIN" in result["capabilities"]["drop"] @@ -246,7 +259,6 @@ class TestApplyEgressToSpec: def test_adds_egress_sidecar_container(self): """Test that egress sidecar container is added to containers list.""" - pod_spec: dict = {} containers: list = [] network_policy = NetworkPolicy( default_action="deny", @@ -255,19 +267,17 @@ def test_adds_egress_sidecar_container(self): egress_image = "opensandbox/egress:v1.0.3" apply_egress_to_spec( - pod_spec=pod_spec, - containers=containers, - network_policy=network_policy, - egress_image=egress_image, + containers, + network_policy, + egress_image, ) assert len(containers) == 1 assert containers[0]["name"] == "egress" assert containers[0]["image"] == egress_image - def test_does_not_add_pod_sysctls_for_ipv6(self): - """IPv6 disable is not merged into Pod securityContext.sysctls (sidecar start script).""" - pod_spec: dict = {} + def test_does_not_touch_unrelated_pod_state(self): + """apply_egress_to_spec only appends to containers (no pod_spec parameter).""" containers: list = [] network_policy = NetworkPolicy( default_action="deny", @@ -276,16 +286,15 @@ def test_does_not_add_pod_sysctls_for_ipv6(self): egress_image = "opensandbox/egress:v1.0.3" apply_egress_to_spec( - pod_spec=pod_spec, - containers=containers, - network_policy=network_policy, - egress_image=egress_image, + containers, + network_policy, + egress_image, ) - assert "securityContext" not in pod_spec + assert len(containers) == 1 - def test_preserves_existing_pod_sysctls_without_merging_ipv6(self): - """Existing Pod sysctls are left unchanged when egress is applied.""" + def test_preserves_existing_pod_sysctls_when_not_passed_in(self): + """Callers keep pod sysctls in their own dict; apply does not mutate them.""" pod_spec: dict = { "securityContext": { "sysctls": [ @@ -302,10 +311,9 @@ def test_preserves_existing_pod_sysctls_without_merging_ipv6(self): egress_image = "opensandbox/egress:v1.0.3" apply_egress_to_spec( - pod_spec=pod_spec, - containers=containers, - network_policy=network_policy, - egress_image=egress_image, + containers, + network_policy, + egress_image, ) sysctls = pod_spec["securityContext"]["sysctls"] @@ -317,22 +325,18 @@ def test_preserves_existing_pod_sysctls_without_merging_ipv6(self): def test_no_op_when_no_network_policy(self): """Test that function does nothing when network_policy is None.""" - pod_spec: dict = {} containers: list = [] apply_egress_to_spec( - pod_spec=pod_spec, - containers=containers, - network_policy=None, - egress_image="opensandbox/egress:v1.0.3", + containers, + None, + "opensandbox/egress:v1.0.3", ) assert len(containers) == 0 - assert "securityContext" not in pod_spec def test_no_op_when_no_egress_image(self): """Test that function does nothing when egress_image is None.""" - pod_spec: dict = {} containers: list = [] network_policy = NetworkPolicy( default_action="deny", @@ -340,11 +344,18 @@ def test_no_op_when_no_egress_image(self): ) apply_egress_to_spec( - pod_spec=pod_spec, - containers=containers, - network_policy=network_policy, - egress_image=None, + containers, + network_policy, + None, ) assert len(containers) == 0 - assert "securityContext" not in pod_spec + + +class TestPrepExecdInitForEgress: + def test_returns_privileged_security_dict_and_prefixed_script(self): + base = "cp ./execd /opt/opensandbox/bin/execd" + script, sc = prep_execd_init_for_egress(base) + assert sc == {"privileged": True} + assert "/proc/sys/net/ipv6/conf/all/disable_ipv6" in script + assert script.endswith(base) diff --git a/server/tests/k8s/test_kubernetes_service.py b/server/tests/k8s/test_kubernetes_service.py index 438b53c57..8a50871a7 100644 --- a/server/tests/k8s/test_kubernetes_service.py +++ b/server/tests/k8s/test_kubernetes_service.py @@ -227,7 +227,8 @@ async def test_create_sandbox_with_no_timeout_calls_provider_with_expires_at_non assert kwargs["expires_at"] is None assert kwargs["labels"].get(SANDBOX_MANUAL_CLEANUP_LABEL) == "true" - def test_create_sandbox_with_network_policy_passes_egress_token_and_annotations( + @pytest.mark.asyncio + async def test_create_sandbox_with_network_policy_passes_egress_token_and_annotations( self, k8s_service, create_sandbox_request ): create_sandbox_request.network_policy = NetworkPolicy(default_action="deny", egress=[]) @@ -245,14 +246,15 @@ def test_create_sandbox_with_network_policy_passes_egress_token_and_annotations( "src.services.k8s.kubernetes_service.generate_egress_token", return_value="egress-token", ): - k8s_service.create_sandbox(create_sandbox_request) + await k8s_service.create_sandbox(create_sandbox_request) _, kwargs = k8s_service.workload_provider.create_workload.call_args assert kwargs["egress_auth_token"] == "egress-token" assert kwargs["egress_mode"] == EGRESS_MODE_DNS assert kwargs["annotations"][SANDBOX_EGRESS_AUTH_TOKEN_METADATA_KEY] == "egress-token" - def test_create_sandbox_with_network_policy_passes_egress_mode_dns_nft_from_config( + @pytest.mark.asyncio + async def test_create_sandbox_with_network_policy_passes_egress_mode_dns_nft_from_config( self, k8s_service, create_sandbox_request ): create_sandbox_request.network_policy = NetworkPolicy(default_action="deny", egress=[]) @@ -273,7 +275,7 @@ def test_create_sandbox_with_network_policy_passes_egress_mode_dns_nft_from_conf "src.services.k8s.kubernetes_service.generate_egress_token", return_value="egress-token", ): - k8s_service.create_sandbox(create_sandbox_request) + await k8s_service.create_sandbox(create_sandbox_request) _, kwargs = k8s_service.workload_provider.create_workload.call_args assert kwargs["egress_mode"] == EGRESS_MODE_DNS_NFT