diff --git a/README.md b/README.md index 6d15c28..c18048d 100644 --- a/README.md +++ b/README.md @@ -181,10 +181,27 @@ for line in client.log('my-pod', follow=True): print(line) ``` +Execute a command inside a pod +```python +from lightkube import Client + +client = Client() + +# Capture stdout or raise ApiError if error code is != 0 +res = client.exec('my-pod', namespace='default', command=['ls', '-l', '/'], + stdout=True, raise_on_error=True) +print(res.stdout) + +# Send data to stdin and capture output +res = client.exec('my-pod', namespace='default', command=['cat'], + stdin='hello\n', stdout=True) +print(res.stdout) +print(res.exit_code) +``` + ## Unsupported features The following features are not supported at the moment: -* Special subresources `attach`, `exec`, `portforward` and `proxy`. -* `auth-provider` authentication method is not supported. The supported - authentication methods are `token`, `username` + `password` and `exec`. \ No newline at end of file +* Special subresources `attach`, `portforward` and `proxy`. +* `auth-provider` authentication method is not supported. The supported authentication methods are `token`, `username` + `password` and `exec`. \ No newline at end of file diff --git a/docs/async-usage.md b/docs/async-usage.md index 67b43b6..509e604 100644 --- a/docs/async-usage.md +++ b/docs/async-usage.md @@ -115,3 +115,22 @@ async def example(): async for line in client.log('my-pod', follow=True): print(line) ``` + +Execute a command inside a pod +```python +from lightkube import AsyncClient + +async def example(): + client = AsyncClient() + + # List a directory + res = await client.exec('my-pod', namespace='default', command=['ls', '-l', '/'], + stdout=True, raise_on_error=True) + print(res.stdout) + + # Send data to stdin and capture output + res = await client.exec('my-pod', namespace='default', command=['cat'], + stdin='hello\n', stdout=True) + print(res.stdout) + print(res.exit_code) +``` diff --git a/e2e-tests/test_client.py b/e2e-tests/test_client.py index c617489..9b823f1 100644 --- a/e2e-tests/test_client.py +++ b/e2e-tests/test_client.py @@ -2,7 +2,7 @@ from pathlib import Path from random import choices from string import ascii_lowercase -from typing import Iterator +from typing import Iterator, Union import pytest @@ -382,7 +382,7 @@ async def test_wait_namespaced_async(resource, for_condition, spec): @pytest.fixture(scope="function") -def sample_crd() -> Iterator[GenericNamespacedResource | GenericGlobalResource]: +def sample_crd() -> Iterator[Union[GenericNamespacedResource, GenericGlobalResource]]: client = Client() fname = Path(__file__).parent.joinpath("test-crd.yaml") with fname.open() as f: @@ -437,3 +437,72 @@ async def test_load_in_cluster_generic_resources_async(sample_crd): # Assert that we now have a generic resource for this CR gr = get_generic_resource(cr_version, cr_kind) assert gr is not None + + +def test_exec_integration_ls_and_cat(): + client = Client() + pod = client.create(create_pod("exec-busybox", "sleep 300")) + try: + client.wait(Pod, pod.metadata.name, for_conditions=["Ready"]) + + ls_res = client.exec( + pod.metadata.name, + namespace=pod.metadata.namespace, + command=["/bin/sh", "-c", "ls /"], + stdout=True, + raise_on_error=True, + ) + assert "bin" in ls_res.stdout.split() + + try: + cat_res = client.exec( + pod.metadata.name, + namespace=pod.metadata.namespace, + command=["/bin/cat"], + stdin="hello from stdin\n", + stdout=True, + raise_on_error=True, + ) + except ApiError as exc: + if "Only subprotocol v5.channel.k8s.io" in str(exc): + pytest.skip("stdin not supported without v5.channel.k8s.io protocol") + raise + assert cat_res.stdout == "hello from stdin\n" + assert cat_res.exit_code == 0 + finally: + client.delete(Pod, pod.metadata.name) + + +@pytest.mark.asyncio +async def test_exec_integration_ls_and_cat_async(): + client = AsyncClient() + pod = await client.create(create_pod("exec-busybox-async", "sleep 300")) + try: + await client.wait(Pod, pod.metadata.name, for_conditions=["Ready"]) + + ls_res = await client.exec( + pod.metadata.name, + namespace=pod.metadata.namespace, + command=["/bin/sh", "-c", "ls /"], + stdout=True, + raise_on_error=True, + ) + assert "bin" in ls_res.stdout.split() + + try: + cat_res = await client.exec( + pod.metadata.name, + namespace=pod.metadata.namespace, + command=["/bin/cat"], + stdin="hello from stdin\n", + stdout=True, + raise_on_error=True, + ) + except ApiError as exc: + if "Only subprotocol v5.channel.k8s.io" in str(exc): + pytest.skip("stdin not supported without v5.channel.k8s.io protocol") + raise + assert cat_res.stdout == "hello from stdin\n" + finally: + await client.delete(Pod, pod.metadata.name) + await client.close() diff --git a/pyproject.toml b/pyproject.toml index 661d124..d90ad01 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "lightkube" -version = "0.19.1" +version = "0.20.0" description = "Lightweight kubernetes client library" readme = "README.md" authors = [ @@ -26,6 +26,7 @@ dependencies = [ "lightkube-models >= 1.15.12.0", "httpx[http2] >= 0.28.1, < 1.0.0", "PyYAML", + "httpx-ws>=0.6.2", ] [project.optional-dependencies] diff --git a/src/lightkube/core/async_client.py b/src/lightkube/core/async_client.py index 2db4faa..da974d8 100644 --- a/src/lightkube/core/async_client.py +++ b/src/lightkube/core/async_client.py @@ -1,4 +1,4 @@ -from typing import AsyncIterable, AsyncIterator, Dict, Iterable, List, Optional, Tuple, Type, Union, overload +from typing import AsyncIterable, AsyncIterator, BinaryIO, Dict, Iterable, List, Optional, Tuple, Type, Union, overload import httpx @@ -7,7 +7,7 @@ from ..config.kubeconfig import KubeConfig, SingleConfig from ..core import resource as r from ..core.exceptions import ConditionError, ObjectDeleted -from ..types import CascadeType, OnErrorHandler, PatchType, on_error_raise +from ..types import CascadeType, ExecResponse, OnErrorHandler, PatchType, on_error_raise from .client import ( AllNamespacedResource, FieldSelector, @@ -755,6 +755,52 @@ async def stream_log() -> AsyncIterator[str]: return stream_log() + async def exec( + self, + name: str, + *, + namespace: Optional[str] = None, + command: Union[str, Iterable[str]], + stdin: Union[str, bytes, BinaryIO, None] = None, + stdout: Union[BinaryIO, bool] = False, + stderr: Union[BinaryIO, bool] = False, + decode: Optional[str] = "utf-8", + raise_on_error: bool = False, + timeout: Optional[float] = None, + ) -> ExecResponse: + """Execute a command in a Pod and return stdout/stderr. + + Parameters: + name: Name of the Pod. + namespace: Name of the namespace containing the Pod. + command: Command to execute in the Pod. + stdin: Data to send to stdin. This can be either a string, bytes or a binary stream. + Strings will be encoded as utf-8 before sending. + stdout: If `True`, the command's stdout will be captured and returned in the response. + If a binary stream is passed, the command's stdout will be written to it instead. + stderr: If `True`, the command's stderr will be captured and returned in the response. + If a binary stream is passed, the command's stderr will be written to it instead. + decode: Decode captured stdout/stderr in `ExecResponse` using this encoding as strings. + If you expect a binary output, set `stdout` and/or `stderr` to a binary stream or set this parameter to `None`. + raise_on_error: If `True`, an exception will be raised if the command exits with a non-zero status code. + Note that other exceptions may still be raised for other types of errors, such as connection issues, missing + pod or timeouts. + timeout: If set, the maximum amount of time in seconds to wait for the command to complete before raising a + timeout exception. By default, there is no timeout and the method will wait until the command completes + or an error occurs. + """ + commands = [command] if isinstance(command, str) else list(command) + params = {"command": commands, "stdout": stdout, "stderr": stderr, "stdin": stdin} + return await self._client.ws_request( + "exec", + name=name, + namespace=namespace, + params=params, + raise_on_error=raise_on_error, + decode=decode, + timeout=timeout, + ) + @overload async def apply( self, diff --git a/src/lightkube/core/client.py b/src/lightkube/core/client.py index 8a92050..d74417b 100644 --- a/src/lightkube/core/client.py +++ b/src/lightkube/core/client.py @@ -1,4 +1,4 @@ -from typing import Dict, Iterable, Iterator, List, Optional, Tuple, Type, TypeVar, Union, overload +from typing import BinaryIO, Dict, Iterable, Iterator, List, Optional, Tuple, Type, TypeVar, Union, overload import httpx @@ -8,7 +8,7 @@ from ..config.kubeconfig import KubeConfig, SingleConfig from ..core import resource as r from ..core.exceptions import ConditionError, ObjectDeleted -from ..types import CascadeType, OnErrorHandler, PatchType, on_error_raise +from ..types import CascadeType, ExecResponse, OnErrorHandler, PatchType, on_error_raise from .generic_client import GenericSyncClient, ListIterable from .internal_resources import core_v1 from .selector import build_selector @@ -741,6 +741,52 @@ def log( self._client.raise_for_status(resp) return (line + "\n" if newlines else line for line in resp.iter_lines()) + def exec( + self, + name: str, + *, + namespace: Optional[str] = None, + command: Union[str, Iterable[str]], + stdin: Union[str, bytes, BinaryIO, None] = None, + stdout: Union[BinaryIO, bool] = False, + stderr: Union[BinaryIO, bool] = False, + decode: Optional[str] = "utf-8", + raise_on_error: bool = False, + timeout: Optional[float] = None, + ) -> ExecResponse: + """Execute a command in a Pod and return stdout/stderr. + + Parameters: + name: Name of the Pod. + namespace: Name of the namespace containing the Pod. + command: Command to execute in the Pod. + stdin: Data to send to stdin. This can be either a string, bytes or a binary stream. + Strings will be encoded as utf-8 before sending. + stdout: If `True`, the command's stdout will be captured and returned in the response. + If a binary stream is passed, the command's stdout will be written to it instead. + stderr: If `True`, the command's stderr will be captured and returned in the response. + If a binary stream is passed, the command's stderr will be written to it instead. + decode: Decode captured stdout/stderr in `ExecResponse` using this encoding as strings. + If you expect a binary output, set `stdout` and/or `stderr` to a binary stream or set this parameter to `None`. + raise_on_error: If `True`, an exception will be raised if the command exits with a non-zero status code. + Note that other exceptions may still be raised for other types of errors, such as connection issues, missing + pod or timeouts. + timeout: If set, the maximum amount of time in seconds to wait for the command to complete before raising a + timeout exception. By default, there is no timeout and the method will wait until the command completes + or an error occurs. + """ + commands = [command] if isinstance(command, str) else list(command) + params = {"command": commands, "stdout": stdout, "stderr": stderr, "stdin": stdin} + return self._client.ws_request( + "exec", + name=name, + namespace=namespace, + params=params, + raise_on_error=raise_on_error, + decode=decode, + timeout=timeout, + ) + @overload def apply( self, diff --git a/src/lightkube/core/generic_client.py b/src/lightkube/core/generic_client.py index 44aafe0..604ae39 100644 --- a/src/lightkube/core/generic_client.py +++ b/src/lightkube/core/generic_client.py @@ -9,9 +9,11 @@ from ..config import client_adapter from ..config.kubeconfig import DEFAULT_KUBECONFIG, KubeConfig, SingleConfig -from ..types import OnErrorAction, OnErrorHandler, PatchType, on_error_raise +from ..types import ExecResponse, OnErrorAction, OnErrorHandler, PatchType, on_error_raise from . import resource as r from .exceptions import ApiError, NotReadyError +from .internal_models import core_v1_res +from .websocket import AsyncWebsocketDriver, WebsocketDriver ALL_NS = "*" @@ -33,6 +35,7 @@ def transform_exception(e: httpx.HTTPError): "post": "POST", "put": "PUT", "watch": "GET", + "exec": "GET", } @@ -179,7 +182,7 @@ def prepare_request( real_method = "watch" if watch else method api_info = r.api_info(res) - if real_method not in api_info.verbs: + if real_method not in api_info.verbs and real_method != "exec": if watch: raise ValueError(f"Resource '{res.__name__}' is not watchable") else: @@ -233,7 +236,7 @@ def prepare_request( data["kind"] = api_info.resource.kind path.append(api_info.plural) - if method in ("delete", "get", "patch", "put") or api_info.action: + if method in ("delete", "get", "patch", "put", "exec") or api_info.action: if name is None and method == "put": name = obj.metadata.name if name is None: @@ -242,6 +245,8 @@ def prepare_request( if api_info.action: path.append(api_info.action) + elif method == "exec": + path.append("exec") http_method = METHOD_MAPPING[method] if http_method == "DELETE": @@ -263,7 +268,7 @@ def raise_for_status(resp): except httpx.HTTPError as e: raise transform_exception(e) from e - def build_adapter_request(self, br: BasicRequest): + def build_adapter_request(self, br: BasicRequest) -> httpx.Request: return self._client.build_request(br.method, br.url, params=br.params, json=br.data, headers=br.headers) def convert_to_resource(self, res: Type[r.Resource], item: dict) -> r.Resource: @@ -341,6 +346,31 @@ def request( resp = self.send(req) return self.handle_response(method, resp, br) + def ws_request( + self, + method, + name=None, + namespace=None, + params: Optional[dict] = None, + raise_on_error: bool = False, + decode: Optional[str] = None, + timeout: Optional[float] = None, + ) -> ExecResponse: + stdin = stdout = stderr = None + if "stdin" in params: + stdin = params["stdin"] + params["stdin"] = True + if "stdout" in params: + stdout = params["stdout"] + params["stdout"] = True + if "stderr" in params: + stderr = params["stderr"] + params["stderr"] = True + br = self.prepare_request(method, core_v1_res.Pod, None, name, namespace, params=params) + return WebsocketDriver(self._client, br, timeout=timeout).write_and_read( + stdin=stdin, stdout=stdout, stderr=stderr, raise_on_error=raise_on_error, decode=decode + ) + def list_chunks(self, br: BasicRequest) -> Iterator[Tuple[str, Iterator]]: cont = True while cont: @@ -399,6 +429,31 @@ async def request( resp = await self.send(req) return self.handle_response(method, resp, br) + async def ws_request( + self, + method, + name=None, + namespace=None, + params: Optional[dict] = None, + raise_on_error: bool = False, + decode: Optional[str] = None, + timeout: Optional[float] = None, + ) -> ExecResponse: + stdin = stdout = stderr = None + if "stdin" in params: + stdin = params["stdin"] + params["stdin"] = True + if "stdout" in params: + stdout = params["stdout"] + params["stdout"] = True + if "stderr" in params: + stderr = params["stderr"] + params["stderr"] = True + br = self.prepare_request(method, core_v1_res.Pod, None, name, namespace, params=params) + return await AsyncWebsocketDriver(self._client, br, timeout=timeout).write_and_read( + stdin=stdin, stdout=stdout, stderr=stderr, raise_on_error=raise_on_error, decode=decode + ) + async def list_chunks(self, br: BasicRequest) -> AsyncIterator[Tuple[str, Iterator]]: cont = True while cont: diff --git a/src/lightkube/core/internal_models.py b/src/lightkube/core/internal_models.py index c95ac82..6e2cfa1 100644 --- a/src/lightkube/core/internal_models.py +++ b/src/lightkube/core/internal_models.py @@ -1,7 +1,10 @@ import sys +from . import resource as res + try: from ..models import autoscaling_v1, core_v1, meta_v1 + from ..resources import core_v1 as core_v1_res except ImportError: if sys.modules["__main__"].__package__ != "mkdocs": # we ignore this import error during documentation generation raise @@ -24,3 +27,13 @@ class ResourceRequirements: core_v1 = mock.Mock() core_v1.ResourceRequirements = ResourceRequirements + + class Pod(res.NamespacedResourceG): + _api_info = res.ApiInfo( + resource=res.ResourceDef("", "v1", "Pod"), + plural="pods", + verbs=[], + ) + + core_v1_res = mock.Mock() + core_v1_res.Pod = Pod diff --git a/src/lightkube/core/websocket.py b/src/lightkube/core/websocket.py new file mode 100644 index 0000000..8f1db92 --- /dev/null +++ b/src/lightkube/core/websocket.py @@ -0,0 +1,223 @@ +import io +import json +import queue +from time import monotonic +from typing import TYPE_CHECKING, Any, BinaryIO, ClassVar, Iterable, List, Optional, TypeVar, Union, overload + +import httpx +from httpx_ws import aconnect_ws, connect_ws + +from ..types import ExecResponse +from .exceptions import ApiError + +if TYPE_CHECKING: + from .generic_client import BasicRequest + +STDIN_CHANNEL: int = 0 +STDOUT_CHANNEL: int = 1 +STDERR_CHANNEL: int = 2 +ERROR_CHANNEL: int = 3 +CLOSE_STDIN: bytes = bytes((255, STDIN_CHANNEL)) +STDIN_BYTES: bytes = bytes((STDIN_CHANNEL,)) + +T = TypeVar("T") + + +@overload +def first(iterable: Iterable[T], default: None) -> Optional[T]: ... + + +@overload +def first(iterable: Iterable[T], default: T) -> T: ... + + +def first(iterable: Iterable[T], default: Optional[T] = None) -> Optional[T]: + iterator = iter(iterable) + return next(iterator, default) + + +class BudgetTimer: + def __init__(self, timeout: Optional[float], timeout_msg: str) -> None: + self._deadline = monotonic() + timeout if timeout is not None else None + self._timeout_msg = timeout_msg + + def budget(self) -> Optional[float]: + if self._deadline is None: + return None + budget = self._deadline - monotonic() + if budget <= 0: + raise httpx.ReadTimeout(self._timeout_msg) + return budget + + +class BaseWebsocketDriver: + PROTOCOLS: ClassVar[List[str]] = ["v5.channel.k8s.io", "v4.channel.k8s.io"] + _TIMEOUT_MSG: ClassVar[str] = "Timeout while waiting complete response from exec command" + _ws: Any + + def __init__(self, client: Union[httpx.Client, httpx.AsyncClient], br: "BasicRequest", timeout: Optional[float] = None): + self._timeout = timeout + ws_func = connect_ws if isinstance(client, httpx.Client) else aconnect_ws + self._ws = ws_func( + br.url, + client, # type: ignore # this is either httpx.Client or httpx.AsyncClient, both of which are accepted by the respective connect_ws function + subprotocols=self.PROTOCOLS, + params=br.params, + ) + + def ensure_stdin_supported(self, ws): + if ws.subprotocol != self.PROTOCOLS[0]: + raise ApiError( + status={ + "status": "Failure", + "message": f"Only subprotocol {self.PROTOCOLS[0]} supports writing to stdin", + } + ) + + def chunk_stdin(self, msg: Union[str, bytes, BinaryIO], chunk_size: int = 128 * 1024) -> Iterable[bytes]: + if hasattr(msg, "read"): + while True: + content = msg.read(chunk_size) + if not content: + break + yield content + else: + if isinstance(msg, str): + msg = msg.encode("utf-8") + yield msg + + +class WebsocketDriver(BaseWebsocketDriver): + def write_stdin(self, ws, msg: Union[str, bytes, BinaryIO], close: bool = False): + self.ensure_stdin_supported(ws) + for chunk in self.chunk_stdin(msg): + ws.send_bytes(STDIN_BYTES + chunk) + if close: + ws.send_bytes(CLOSE_STDIN) # Close connection + + def write_and_read( + self, + stdin: Union[str, bytes, BinaryIO, None] = None, + stdout: Union[BinaryIO, bool] = False, + stderr: Union[BinaryIO, bool] = False, + decode: Optional[str] = None, + raise_on_error: bool = False, + ) -> ExecResponse: + with self._ws as ws: + if stdin is not None: + self.write_stdin(ws, stdin, close=True) + + return self.read_output(ws, stdout=stdout, stderr=stderr, raise_on_error=raise_on_error, decode=decode) + + def read_output( + self, + ws, + stdout: Union[BinaryIO, bool] = False, + stderr: Union[BinaryIO, bool] = False, + raise_on_error: bool = False, + decode: Optional[str] = None, + ) -> ExecResponse: + accumulator = ExecAccumulator(stdout=stdout, stderr=stderr, raise_on_error=raise_on_error, decode=decode) + timer = BudgetTimer(self._timeout, self._TIMEOUT_MSG) + while True: + budget = timer.budget() + try: + message = ws.receive_bytes(timeout=budget) + except queue.Empty as e: + raise httpx.ReadTimeout(self._TIMEOUT_MSG) from e + channel, message = message[0], message[1:] + response = accumulator.feed(channel, message) + if response is not None: + return response + + +class AsyncWebsocketDriver(BaseWebsocketDriver): + async def write_stdin(self, ws, msg: Union[str, bytes, BinaryIO], close: bool = False): + self.ensure_stdin_supported(ws) + for chunk in self.chunk_stdin(msg): + await ws.send_bytes(STDIN_BYTES + chunk) + if close: + await ws.send_bytes(CLOSE_STDIN) # Close connection + + async def write_and_read( + self, + stdin: Union[str, bytes, BinaryIO, None] = None, + stdout: Union[BinaryIO, bool] = False, + stderr: Union[BinaryIO, bool] = False, + decode: Optional[str] = None, + raise_on_error: bool = False, + ) -> ExecResponse: + async with self._ws as ws: + if stdin is not None: + await self.write_stdin(ws, stdin, close=True) + + return await self.read_output(ws, stdout=stdout, stderr=stderr, raise_on_error=raise_on_error, decode=decode) + + async def read_output( + self, + ws, + stdout: Union[BinaryIO, bool] = False, + stderr: Union[BinaryIO, bool] = False, + raise_on_error: bool = False, + decode: Optional[str] = None, + ) -> ExecResponse: + accumulator = ExecAccumulator(stdout=stdout, stderr=stderr, raise_on_error=raise_on_error, decode=decode) + timer = BudgetTimer(self._timeout, self._TIMEOUT_MSG) + while True: + budget = timer.budget() + try: + message = await ws.receive_bytes(timeout=budget) + except TimeoutError as e: + raise httpx.ReadTimeout(self._TIMEOUT_MSG) from e + channel, message = message[0], message[1:] + response = accumulator.feed(channel, message) + if response is not None: + return response + + +class ExecAccumulator: + def __init__( + self, + *, + stdout: Union[BinaryIO, bool], + stderr: Union[BinaryIO, bool], + raise_on_error: bool, + decode: Optional[str], + ) -> None: + self._raise_on_error = raise_on_error + self._decode = decode + self._capture_stdout = stdout is True + self._capture_stderr = stderr is True + self._stdout = io.BytesIO() if stdout is True else stdout + self._stderr = io.BytesIO() if stderr is True else stderr + + def feed(self, channel: int, message: bytes) -> Optional[ExecResponse]: + if channel == STDOUT_CHANNEL: + if self._stdout: + self._stdout.write(message) + return None + if channel == STDERR_CHANNEL: + if self._stderr: + self._stderr.write(message) + return None + if channel != ERROR_CHANNEL: + return None + + exit_code = 0 + error = ApiError(status=json.loads(message)) + if error.status.status == "Failure": + if error.status.reason != "NonZeroExitCode" or self._raise_on_error: + raise error + details = error.status.details + if details and details.causes: + exit_code = first( + (int(cause.message) for cause in details.causes if cause.reason == "ExitCode" and cause.message), -1 + ) + + stdout_value = stderr_value = None + if self._capture_stdout: + stdout_value = self._stdout.getvalue() if self._decode is None else self._stdout.getvalue().decode(self._decode) # type: ignore # _stdout is always BytesIO when _capture_stdout is True, so it has getvalue() method + if self._capture_stderr: + stderr_value = self._stderr.getvalue() if self._decode is None else self._stderr.getvalue().decode(self._decode) # type: ignore # _stdout is always BytesIO when _capture_stderr is True, so it has getvalue() method + + return ExecResponse(stdout=stdout_value, stderr=stderr_value, exit_code=exit_code) diff --git a/src/lightkube/types.py b/src/lightkube/types.py index 87dafce..4870867 100644 --- a/src/lightkube/types.py +++ b/src/lightkube/types.py @@ -1,6 +1,7 @@ import enum import typing from dataclasses import dataclass +from typing import NamedTuple, Optional, Union class PatchType(enum.Enum): @@ -66,3 +67,18 @@ def on_error_stop(e: Exception, count: int) -> OnErrorResult: def on_error_retry(e: Exception, count: int) -> OnErrorResult: """Retry to perform the API call again from the last version""" return OnErrorResult(OnErrorAction.RETRY) + + +class ExecResponse(NamedTuple): + """ + Response from an exec command, containing stdout, stderr and exit code. + + Attributes: + stdout: The command's stdout, if captured. + stderr: The command's stderr, if captured. + exit_code: The command's exit code. + """ + + stdout: Optional[Union[str, bytes]] = None + stderr: Optional[Union[str, bytes]] = None + exit_code: int = 0 diff --git a/tests/fake_ws.py b/tests/fake_ws.py new file mode 100644 index 0000000..34d017c --- /dev/null +++ b/tests/fake_ws.py @@ -0,0 +1,85 @@ +import json +from typing import Dict, List, Optional + + +class _AwaitableResult: + __slots__ = ("_value",) + + def __init__(self, value) -> None: + self._value = value + + def __await__(self): + if False: # needed to make this a generator function + yield None + return self._value + + +class _AwaitableBytes(_AwaitableResult): + def __getitem__(self, key): + return self._value[key] + + def __len__(self) -> int: + return len(self._value) + + def __iter__(self): + return iter(self._value) + + def __bytes__(self) -> bytes: + return self._value + + +class FakeWS: + subprotocol = "v5.channel.k8s.io" + + def __init__(self, messages, exit_code: int = 0): + # messages: list of (channel, payload) tuples + self._messages = [] + for ch, payload in messages: + if isinstance(payload, str): + payload = payload.encode("utf-8") + self._messages.append(bytes([ch]) + payload) + # append an ERROR channel status message reflecting exit_code + status: Dict[str, object] + if exit_code == 0: + status = {"status": "Success"} + else: + status = { + "status": "Failure", + "reason": "NonZeroExitCode", + "details": {"causes": [{"reason": "ExitCode", "message": str(exit_code)}]}, + "message": "command exited", + } + self._messages.append(bytes([3]) + json.dumps(status).encode("utf-8")) + self.sent: List[bytes] = [] + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + def receive_bytes(self, timeout: Optional[float] = None): + return _AwaitableBytes(self._messages.pop(0)) + + def send_bytes(self, data): + self.sent.append(data) + return _AwaitableResult(None) + + @staticmethod + def make_connect(messages, exit_code: int = 0): + def _connect(url, client, subprotocols, params): + return FakeWS(messages, exit_code) + + return _connect + + def as_connect(self): + def _connect(url, client, subprotocols, params): + return self + + return _connect diff --git a/tests/test_async_client.py b/tests/test_async_client.py index 6c550b9..4127981 100644 --- a/tests/test_async_client.py +++ b/tests/test_async_client.py @@ -1,3 +1,4 @@ +import io import unittest.mock import warnings @@ -8,10 +9,14 @@ import lightkube from lightkube import types from lightkube.config.kubeconfig import KubeConfig +from lightkube.core import websocket +from lightkube.core.websocket import STDERR_CHANNEL, STDOUT_CHANNEL from lightkube.generic_resource import create_global_resource from lightkube.models.meta_v1 import ObjectMeta from lightkube.resources.core_v1 import Binding, Node, Pod +from lightkube.types import ExecResponse +from .fake_ws import FakeWS from .test_client import ( json_contains, make_wait_custom, @@ -432,6 +437,91 @@ async def test_pod_log(client: lightkube.AsyncClient): await client.close() +@pytest.mark.asyncio +async def test_exec_captures_stdout_stderr(client: lightkube.AsyncClient, monkeypatch) -> None: + messages = [(STDOUT_CHANNEL, b"out"), (STDERR_CHANNEL, b"err")] + + monkeypatch.setattr(websocket, "aconnect_ws", FakeWS.make_connect(messages, exit_code=0)) + + res = await client.exec("pod-1", command=["/bin/echo", "hi"], stdout=True, stderr=True, decode=None) + assert res.stdout == b"out" + assert res.stderr == b"err" + assert res.exit_code == 0 + + messages = [(STDOUT_CHANNEL, b"out"), (STDERR_CHANNEL, b"err")] + monkeypatch.setattr(websocket, "aconnect_ws", FakeWS.make_connect(messages, exit_code=0)) + res = await client.exec("pod-1", command=["/bin/echo", "hi"], stdout=True, stderr=True) + assert res.stdout == "out" + assert res.stderr == "err" + assert res.exit_code == 0 + + res = await client.exec("pod-1", command=["/bin/echo", "hi"]) + assert res.stdout is None + assert res.stderr is None + assert res.exit_code == 0 + + +@pytest.mark.asyncio +async def test_exec_raises_on_non_zero_exit(client: lightkube.AsyncClient, monkeypatch) -> None: + messages = [] + + monkeypatch.setattr(websocket, "aconnect_ws", FakeWS.make_connect(messages, exit_code=12)) + + with pytest.raises(lightkube.ApiError): + await client.exec("pod-1", command="/bin/false", raise_on_error=True) + + res = await client.exec("pod-1", command="/bin/false", raise_on_error=False) + assert res.exit_code == 12 + + +@pytest.mark.asyncio +async def test_exec_writes_to_provided_streams(client: lightkube.AsyncClient, monkeypatch) -> None: + from lightkube.core.websocket import STDERR_CHANNEL, STDOUT_CHANNEL + + messages = [(STDOUT_CHANNEL, b"out-stream"), (STDERR_CHANNEL, b"err-stream")] + monkeypatch.setattr(websocket, "aconnect_ws", FakeWS.make_connect(messages, exit_code=0)) + + out_stream = io.BytesIO() + err_stream = io.BytesIO() + + res = await client.exec("pod-stream", command=["/bin/echo"], stdout=out_stream, stderr=err_stream) + + # When passing streams, ExecResponse stdout/stderr are None, but streams receive data + assert isinstance(res, ExecResponse) + assert res.stdout is None and res.stderr is None + assert out_stream.getvalue() == b"out-stream" + assert err_stream.getvalue() == b"err-stream" + + +@pytest.mark.asyncio +async def test_exec_stdin_variants(client: lightkube.AsyncClient, monkeypatch) -> None: + messages = [] + + # bytes + ws = FakeWS(messages, exit_code=0) + monkeypatch.setattr(websocket, "aconnect_ws", ws.as_connect()) + await client.exec("pod-stdin", command=["/bin/cmd"], stdin=b"byte-input") + assert any(b"byte-input" in s for s in ws.sent) + + # str + ws2 = FakeWS(messages, exit_code=0) + monkeypatch.setattr(websocket, "aconnect_ws", ws2.as_connect()) + await client.exec("pod-stdin", command=["/bin/cmd"], stdin="text-input") + assert any(b"text-input" in s for s in ws2.sent) + + # file-like + ws3 = FakeWS(messages, exit_code=0) + monkeypatch.setattr(websocket, "aconnect_ws", ws3.as_connect()) + await client.exec("pod-stdin", command=["/bin/cmd"], stdin=io.BytesIO(b"stream-input")) + assert any(b"stream-input" in s for s in ws3.sent) + + # None (no stdin) -> nothing sent + ws4 = FakeWS(messages, exit_code=0) + monkeypatch.setattr(websocket, "aconnect_ws", ws4.as_connect()) + await client.exec("pod-stdin", command=["/bin/cmd"], stdin=None) + assert ws4.sent == [] + + @respx.mock @pytest.mark.asyncio async def test_apply_namespaced(client: lightkube.AsyncClient): diff --git a/tests/test_client.py b/tests/test_client.py index 0d9a63f..37cd0c2 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,3 +1,4 @@ +import io import json import unittest.mock import warnings @@ -12,9 +13,14 @@ from lightkube import types from lightkube.config.kubeconfig import KubeConfig, SingleConfig from lightkube.config.models import Cluster, Context, User +from lightkube.core import websocket +from lightkube.core.websocket import STDERR_CHANNEL, STDOUT_CHANNEL from lightkube.generic_resource import create_global_resource from lightkube.models.meta_v1 import ObjectMeta from lightkube.resources.core_v1 import Binding, Node, Pod +from lightkube.types import ExecResponse + +from .fake_ws import FakeWS warnings.filterwarnings("ignore", category=DeprecationWarning) @@ -729,6 +735,89 @@ def test_pod_log(client: lightkube.Client) -> None: assert lines == result +def test_exec_captures_stdout_stderr(client: lightkube.Client, monkeypatch) -> None: + messages = [(STDOUT_CHANNEL, b"out"), (STDERR_CHANNEL, b"err")] + + monkeypatch.setattr(websocket, "connect_ws", FakeWS.make_connect(messages, exit_code=0)) + + res = client.exec("pod-1", command=["/bin/echo", "hi"], stdout=True, stderr=True, decode=None) + assert res.stdout == b"out" + assert res.stderr == b"err" + assert res.exit_code == 0 + + messages = [(STDOUT_CHANNEL, b"out"), (STDERR_CHANNEL, b"err")] + monkeypatch.setattr(websocket, "connect_ws", FakeWS.make_connect(messages, exit_code=0)) + res = client.exec("pod-1", command=["/bin/echo", "hi"], stdout=True, stderr=True) + assert res.stdout == "out" + assert res.stderr == "err" + assert res.exit_code == 0 + + res = client.exec("pod-1", command=["/bin/echo", "hi"]) + assert res.stdout is None + assert res.stderr is None + assert res.exit_code == 0 + + +def test_exec_raises_on_non_zero_exit(client: lightkube.Client, monkeypatch) -> None: + messages = [] + from lightkube.core import websocket + + monkeypatch.setattr(websocket, "connect_ws", FakeWS.make_connect(messages, exit_code=12)) + + with pytest.raises(lightkube.ApiError): + client.exec("pod-1", command="/bin/false", raise_on_error=True) + + res = client.exec("pod-1", command="/bin/false", raise_on_error=False) + assert res.exit_code == 12 + + +def test_exec_writes_to_provided_streams(client: lightkube.Client, monkeypatch) -> None: + import io + + messages = [(STDOUT_CHANNEL, b"out-stream"), (STDERR_CHANNEL, b"err-stream")] + + monkeypatch.setattr(websocket, "connect_ws", FakeWS.make_connect(messages, exit_code=0)) + + out_stream = io.BytesIO() + err_stream = io.BytesIO() + + res = client.exec("pod-stream", command=["/bin/echo"], stdout=out_stream, stderr=err_stream) + + # When passing streams, ExecResponse stdout/stderr are None, but streams receive data + assert isinstance(res, ExecResponse) + assert res.stdout is None and res.stderr is None + assert out_stream.getvalue() == b"out-stream" + assert err_stream.getvalue() == b"err-stream" + + +def test_exec_stdin_variants(client: lightkube.Client, monkeypatch) -> None: + messages = [] + + # bytes + ws = FakeWS(messages, exit_code=0) + monkeypatch.setattr(websocket, "connect_ws", ws.as_connect()) + client.exec("pod-stdin", command=["/bin/cmd"], stdin=b"byte-input") + assert any(b"byte-input" in s for s in ws.sent) + + # str + ws2 = FakeWS(messages, exit_code=0) + monkeypatch.setattr(websocket, "connect_ws", ws2.as_connect()) + client.exec("pod-stdin", command=["/bin/cmd"], stdin="text-input") + assert any(b"text-input" in s for s in ws2.sent) + + # file-like + ws3 = FakeWS(messages, exit_code=0) + monkeypatch.setattr(websocket, "connect_ws", ws3.as_connect()) + client.exec("pod-stdin", command=["/bin/cmd"], stdin=io.BytesIO(b"stream-input")) + assert any(b"stream-input" in s for s in ws3.sent) + + # None (no stdin) -> nothing sent + ws4 = FakeWS(messages, exit_code=0) + monkeypatch.setattr(websocket, "connect_ws", ws4.as_connect()) + client.exec("pod-stdin", command=["/bin/cmd"], stdin=None) + assert ws4.sent == [] + + @respx.mock def test_apply_namespaced(client: lightkube.Client) -> None: req = respx.patch("https://localhost:9443/api/v1/namespaces/default/pods/xy?fieldManager=test").respond( diff --git a/uv.lock b/uv.lock index 28c6b29..fc0984a 100644 --- a/uv.lock +++ b/uv.lock @@ -821,6 +821,60 @@ http2 = [ { name = "h2", version = "4.3.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.9'" }, ] +[[package]] +name = "httpx-ws" +version = "0.6.2" +source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version < '3.9'", +] +dependencies = [ + { name = "anyio", version = "4.5.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, + { name = "httpcore", marker = "python_full_version < '3.9'" }, + { name = "httpx", marker = "python_full_version < '3.9'" }, + { name = "wsproto", version = "1.2.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/71/55/4ee9f216bfda692e9739823e73765b7f66f088e6ac1de5fece14e87ca153/httpx_ws-0.6.2.tar.gz", hash = "sha256:b07446b9067a30f1012fa9851fdfd14207012cd657c485565884f90553d0854c", size = 23430, upload-time = "2024-10-07T07:32:54.332Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b2/36/69a5a5da21f420ac67aaeafd963713db829ab1be307b0c5cbf80d846144b/httpx_ws-0.6.2-py3-none-any.whl", hash = "sha256:24f87427acb757ada200aeab016cc429fa0bc71b0730429c37634867194e305c", size = 14138, upload-time = "2024-10-07T07:32:56.366Z" }, +] + +[[package]] +name = "httpx-ws" +version = "0.7.2" +source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version == '3.9.*'", +] +dependencies = [ + { name = "anyio", version = "4.12.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version == '3.9.*'" }, + { name = "httpcore", marker = "python_full_version == '3.9.*'" }, + { name = "httpx", marker = "python_full_version == '3.9.*'" }, + { name = "wsproto", version = "1.2.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version == '3.9.*'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/56/ba/e310ccdb8f18a2b894bfacd085ef390cf6cc70bb10ff9f109d58d94f6b47/httpx_ws-0.7.2.tar.gz", hash = "sha256:93edea6c8fc313464fc287bff7d2ad20e6196b7754c76f946f73b4af79886d4e", size = 24513, upload-time = "2025-03-28T13:20:03.039Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/03/3d/2113a5c7af9a13663fa026882d0302ed4142960388536f885dacd6be7038/httpx_ws-0.7.2-py3-none-any.whl", hash = "sha256:dd7bf9dbaa96dcd5cef1af3a7e1130cfac068bebecce25a74145022f5a8427a3", size = 14424, upload-time = "2025-03-28T13:20:04.238Z" }, +] + +[[package]] +name = "httpx-ws" +version = "0.8.2" +source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version >= '3.10'", +] +dependencies = [ + { name = "anyio", version = "4.12.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, + { name = "httpcore", marker = "python_full_version >= '3.10'" }, + { name = "httpx", marker = "python_full_version >= '3.10'" }, + { name = "wsproto", version = "1.3.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4a/32/6f7198f55d94063ea84487a31cdd3e149d2702dc0804fc5de06ed12ef2c2/httpx_ws-0.8.2.tar.gz", hash = "sha256:ba0d4aa76e1c8a27bd5e88984ecdcdc28f7bf30b40cb0989a4c1438d07fa52c7", size = 105734, upload-time = "2025-11-07T12:57:36.566Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/29/cd/2008972ddc4c2139b9813d8a097e53dcc74b2a16a85b4069294457954232/httpx_ws-0.8.2-py3-none-any.whl", hash = "sha256:f8898ddb84cbf98c562e8e796675bc68c215fa1d453d54a7fcd935aca8198cc8", size = 15404, upload-time = "2025-11-07T12:57:35.176Z" }, +] + [[package]] name = "hyperframe" version = "6.0.1" @@ -1008,10 +1062,13 @@ wheels = [ [[package]] name = "lightkube" -version = "0.19.1" +version = "0.20.0" source = { editable = "." } dependencies = [ { name = "httpx", extra = ["http2"] }, + { name = "httpx-ws", version = "0.6.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, + { name = "httpx-ws", version = "0.7.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version == '3.9.*'" }, + { name = "httpx-ws", version = "0.8.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, { name = "lightkube-models", version = "1.34.0.8", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, { name = "lightkube-models", version = "1.35.0.8", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.9'" }, { name = "pyyaml" }, @@ -1093,6 +1150,7 @@ testing = [ [package.metadata] requires-dist = [ { name = "httpx", extras = ["http2"], specifier = ">=0.28.1,<1.0.0" }, + { name = "httpx-ws", specifier = ">=0.6.2" }, { name = "jinja2", marker = "extra == 'jinja-templates'" }, { name = "lightkube-models", specifier = ">=1.15.12.0" }, { name = "pyyaml" }, @@ -2487,6 +2545,37 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0b/2c/87f3254fd8ffd29e4c02732eee68a83a1d3c346ae39bc6822dcbcb697f2b/wheel-0.45.1-py3-none-any.whl", hash = "sha256:708e7481cc80179af0e556bbf0cc00b8444c7321e2700b8d8580231d13017248", size = 72494, upload-time = "2024-11-23T00:18:21.207Z" }, ] +[[package]] +name = "wsproto" +version = "1.2.0" +source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version == '3.9.*'", + "python_full_version < '3.9'", +] +dependencies = [ + { name = "h11", marker = "python_full_version < '3.10'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c9/4a/44d3c295350d776427904d73c189e10aeae66d7f555bb2feee16d1e4ba5a/wsproto-1.2.0.tar.gz", hash = "sha256:ad565f26ecb92588a3e43bc3d96164de84cd9902482b130d0ddbaa9664a85065", size = 53425, upload-time = "2022-08-23T19:58:21.447Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/78/58/e860788190eba3bcce367f74d29c4675466ce8dddfba85f7827588416f01/wsproto-1.2.0-py3-none-any.whl", hash = "sha256:b9acddd652b585d75b20477888c56642fdade28bdfd3579aa24a4d2c037dd736", size = 24226, upload-time = "2022-08-23T19:58:19.96Z" }, +] + +[[package]] +name = "wsproto" +version = "1.3.2" +source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version >= '3.10'", +] +dependencies = [ + { name = "h11", marker = "python_full_version >= '3.10'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c7/79/12135bdf8b9c9367b8701c2c19a14c913c120b882d50b014ca0d38083c2c/wsproto-1.3.2.tar.gz", hash = "sha256:b86885dcf294e15204919950f666e06ffc6c7c114ca900b060d6e16293528294", size = 50116, upload-time = "2025-11-20T18:18:01.871Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a4/f5/10b68b7b1544245097b2a1b8238f66f2fc6dcaeb24ba5d917f52bd2eed4f/wsproto-1.3.2-py3-none-any.whl", hash = "sha256:61eea322cdf56e8cc904bd3ad7573359a242ba65688716b0710a5eb12beab584", size = 24405, upload-time = "2025-11-20T18:18:00.454Z" }, +] + [[package]] name = "zipp" version = "3.20.2"