Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,27 @@ for line in client.log('my-pod', follow=True):
print(line)
```

Execute a command inside a pod
```python
from lightkube import Client

client = Client()

# Capture stdout or raise ApiError if error code is != 0
res = client.exec('my-pod', namespace='default', command=['ls', '-l', '/'],
stdout=True, raise_on_error=True)
print(res.stdout)

# Send data to stdin and capture output
res = client.exec('my-pod', namespace='default', command=['cat'],
stdin='hello\n', stdout=True)
print(res.stdout)
print(res.exit_code)
```

## Unsupported features

The following features are not supported at the moment:

* Special subresources `attach`, `exec`, `portforward` and `proxy`.
* `auth-provider` authentication method is not supported. The supported
authentication methods are `token`, `username` + `password` and `exec`.
* Special subresources `attach`, `portforward` and `proxy`.
* `auth-provider` authentication method is not supported. The supported authentication methods are `token`, `username` + `password` and `exec`.
19 changes: 19 additions & 0 deletions docs/async-usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,22 @@ async def example():
async for line in client.log('my-pod', follow=True):
print(line)
```

Execute a command inside a pod
```python
from lightkube import AsyncClient

async def example():
client = AsyncClient()

# List a directory
res = await client.exec('my-pod', namespace='default', command=['ls', '-l', '/'],
stdout=True, raise_on_error=True)
print(res.stdout)

# Send data to stdin and capture output
res = await client.exec('my-pod', namespace='default', command=['cat'],
stdin='hello\n', stdout=True)
print(res.stdout)
print(res.exit_code)
```
69 changes: 69 additions & 0 deletions e2e-tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,72 @@ async def test_load_in_cluster_generic_resources_async(sample_crd):
# Assert that we now have a generic resource for this CR
gr = get_generic_resource(cr_version, cr_kind)
assert gr is not None


def test_exec_integration_ls_and_cat():
client = Client()
pod = client.create(create_pod("exec-busybox", "sleep 300"))
try:
client.wait(Pod, pod.metadata.name, for_conditions=["Ready"])

ls_res = client.exec(
pod.metadata.name,
namespace=pod.metadata.namespace,
command=["/bin/sh", "-c", "ls /"],
stdout=True,
raise_on_error=True,
)
assert "bin" in ls_res.stdout.split()

try:
cat_res = client.exec(
pod.metadata.name,
namespace=pod.metadata.namespace,
command=["/bin/cat"],
stdin="hello from stdin\n",
stdout=True,
raise_on_error=True,
)
except ApiError as exc:
if "Only subprotocol v5.channel.k8s.io" in str(exc):
pytest.skip("stdin not supported without v5.channel.k8s.io protocol")
raise
assert cat_res.stdout == "hello from stdin\n"
assert cat_res.exit_code == 0
finally:
client.delete(Pod, pod.metadata.name)


@pytest.mark.asyncio
async def test_exec_integration_ls_and_cat_async():
client = AsyncClient()
pod = await client.create(create_pod("exec-busybox-async", "sleep 300"))
try:
await client.wait(Pod, pod.metadata.name, for_conditions=["Ready"])

ls_res = await client.exec(
pod.metadata.name,
namespace=pod.metadata.namespace,
command=["/bin/sh", "-c", "ls /"],
stdout=True,
raise_on_error=True,
)
assert "bin" in ls_res.stdout.split()

try:
cat_res = await client.exec(
pod.metadata.name,
namespace=pod.metadata.namespace,
command=["/bin/cat"],
stdin="hello from stdin\n",
stdout=True,
raise_on_error=True,
)
except ApiError as exc:
if "Only subprotocol v5.channel.k8s.io" in str(exc):
pytest.skip("stdin not supported without v5.channel.k8s.io protocol")
raise
assert cat_res.stdout == "hello from stdin\n"
finally:
await client.delete(Pod, pod.metadata.name)
await client.close()
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "lightkube"
version = "0.19.1"
version = "0.20.0"
description = "Lightweight kubernetes client library"
readme = "README.md"
authors = [
Expand All @@ -26,6 +26,7 @@ dependencies = [
"lightkube-models >= 1.15.12.0",
"httpx[http2] >= 0.28.1, < 1.0.0",
"PyYAML",
"httpx-ws>=0.6.2",
]

[project.optional-dependencies]
Expand Down
38 changes: 37 additions & 1 deletion src/lightkube/core/async_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import AsyncIterable, AsyncIterator, Dict, Iterable, List, Optional, Tuple, Type, Union, overload
from typing import AsyncIterable, AsyncIterator, BinaryIO, Dict, Iterable, List, Optional, Tuple, Type, Union, overload

import httpx

Expand All @@ -20,6 +20,7 @@
from .generic_client import GenericAsyncClient, ListAsyncIterable
from .internal_resources import core_v1
from .selector import build_selector
from .websocket import ExecResponse


class AsyncClient:
Expand Down Expand Up @@ -755,6 +756,41 @@ async def stream_log() -> AsyncIterator[str]:

return stream_log()

async def exec(
self,
name: str,
*,
namespace: Optional[str] = None,
command: Union[str, Iterable[str]],
stdin: Union[str, bytes, BinaryIO, None] = None,
stdout: Union[BinaryIO, bool] = False,
stderr: Union[BinaryIO, bool] = False,
decode: Optional[str] = "utf-8",
raise_on_error: bool = False,
) -> ExecResponse:
"""Execute a command in a Pod and return stdout/stderr.

Parameters:
name: Name of the Pod.
namespace: Name of the namespace containing the Pod.
command: Command to execute in the Pod.
stdin: Data to send to stdin. This can be either a string, bytes or a binary stream.
Strings will be encoded as utf-8 before sending.
stdout: If `True`, the command's stdout will be captured and returned in the response.
If a binary stream is passed, the command's stdout will be written to it instead.
stderr: If `True`, the command's stderr will be captured and returned in the response.
If a binary stream is passed, the command's stderr will be written to it instead.
decode: Decode captured stdout/stderr in `ExecResponse` using this encoding as strings.
If you expect a binary output, set `stdout` and/or `stderr` to a binary stream or set this parameter to `None`.
raise_on_error: If `True`, an exception will be raised if the command exits with a non-zero status code.
Note that other exceptions may still be raised for other types of errors, such as connection issues, missing pod or timeouts.
"""
commands = [command] if isinstance(command, str) else list(command)
params = {"command": commands, "stdout": stdout, "stderr": stderr, "stdin": stdin}
return await self._client.ws_request(
"exec", name=name, namespace=namespace, params=params, raise_on_error=raise_on_error, decode=decode
)

@overload
async def apply(
self,
Expand Down
38 changes: 37 additions & 1 deletion src/lightkube/core/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Iterable, Iterator, List, Optional, Tuple, Type, TypeVar, Union, overload
from typing import BinaryIO, Dict, Iterable, Iterator, List, Optional, Tuple, Type, TypeVar, Union, overload

import httpx

Expand All @@ -12,6 +12,7 @@
from .generic_client import GenericSyncClient, ListIterable
from .internal_resources import core_v1
from .selector import build_selector
from .websocket import ExecResponse

NamespacedResource = TypeVar("NamespacedResource", bound=r.NamespacedResource)
GlobalResource = TypeVar("GlobalResource", bound=r.GlobalResource)
Expand Down Expand Up @@ -741,6 +742,41 @@ def log(
self._client.raise_for_status(resp)
return (line + "\n" if newlines else line for line in resp.iter_lines())

def exec(
self,
name: str,
*,
namespace: Optional[str] = None,
command: Union[str, Iterable[str]],
stdin: Union[str, bytes, BinaryIO, None] = None,
stdout: Union[BinaryIO, bool] = False,
stderr: Union[BinaryIO, bool] = False,
decode: Optional[str] = "utf-8",
raise_on_error: bool = False,
) -> ExecResponse:
"""Execute a command in a Pod and return stdout/stderr.

Parameters:
name: Name of the Pod.
namespace: Name of the namespace containing the Pod.
command: Command to execute in the Pod.
stdin: Data to send to stdin. This can be either a string, bytes or a binary stream.
Strings will be encoded as utf-8 before sending.
stdout: If `True`, the command's stdout will be captured and returned in the response.
If a binary stream is passed, the command's stdout will be written to it instead.
stderr: If `True`, the command's stderr will be captured and returned in the response.
If a binary stream is passed, the command's stderr will be written to it instead.
decode: Decode captured stdout/stderr in `ExecResponse` using this encoding as strings.
If you expect a binary output, set `stdout` and/or `stderr` to a binary stream or set this parameter to `None`.
raise_on_error: If `True`, an exception will be raised if the command exits with a non-zero status code.
Note that other exceptions may still be raised for other types of errors, such as connection issues, missing pod or timeouts.
"""
commands = [command] if isinstance(command, str) else list(command)
params = {"command": commands, "stdout": stdout, "stderr": stderr, "stdin": stdin}
return self._client.ws_request(
"exec", name=name, namespace=namespace, params=params, raise_on_error=raise_on_error, decode=decode
)

@overload
def apply(
self,
Expand Down
59 changes: 56 additions & 3 deletions src/lightkube/core/generic_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from ..types import OnErrorAction, OnErrorHandler, PatchType, on_error_raise
from . import resource as r
from .exceptions import ApiError, NotReadyError
from .internal_models import core_v1_res
from .websocket import AsyncWebsocketDriver, ExecResponse, WebsocketDriver

ALL_NS = "*"

Expand All @@ -33,6 +35,7 @@ def transform_exception(e: httpx.HTTPError):
"post": "POST",
"put": "PUT",
"watch": "GET",
"exec": "GET",
}


Expand Down Expand Up @@ -179,7 +182,7 @@ def prepare_request(
real_method = "watch" if watch else method

api_info = r.api_info(res)
if real_method not in api_info.verbs:
if real_method not in api_info.verbs and real_method != "exec":
if watch:
raise ValueError(f"Resource '{res.__name__}' is not watchable")
else:
Expand Down Expand Up @@ -233,7 +236,7 @@ def prepare_request(
data["kind"] = api_info.resource.kind

path.append(api_info.plural)
if method in ("delete", "get", "patch", "put") or api_info.action:
if method in ("delete", "get", "patch", "put", "exec") or api_info.action:
if name is None and method == "put":
name = obj.metadata.name
if name is None:
Expand All @@ -242,6 +245,8 @@ def prepare_request(

if api_info.action:
path.append(api_info.action)
elif method == "exec":
path.append("exec")

http_method = METHOD_MAPPING[method]
if http_method == "DELETE":
Expand All @@ -263,7 +268,7 @@ def raise_for_status(resp):
except httpx.HTTPError as e:
raise transform_exception(e) from e

def build_adapter_request(self, br: BasicRequest):
def build_adapter_request(self, br: BasicRequest) -> httpx.Request:
return self._client.build_request(br.method, br.url, params=br.params, json=br.data, headers=br.headers)

def convert_to_resource(self, res: Type[r.Resource], item: dict) -> r.Resource:
Expand Down Expand Up @@ -341,6 +346,30 @@ def request(
resp = self.send(req)
return self.handle_response(method, resp, br)

def ws_request(
self,
method,
name=None,
namespace=None,
params: Optional[dict] = None,
raise_on_error: bool = False,
decode: Optional[str] = None,
) -> ExecResponse:
stdin = stdout = stderr = None
if "stdin" in params:
stdin = params["stdin"]
params["stdin"] = True
if "stdout" in params:
stdout = params["stdout"]
params["stdout"] = True
if "stderr" in params:
stderr = params["stderr"]
params["stderr"] = True
br = self.prepare_request(method, core_v1_res.Pod, None, name, namespace, params=params)
return WebsocketDriver(self._client, br).write_and_read(
stdin=stdin, stdout=stdout, stderr=stderr, raise_on_error=raise_on_error, decode=decode
)

def list_chunks(self, br: BasicRequest) -> Iterator[Tuple[str, Iterator]]:
cont = True
while cont:
Expand Down Expand Up @@ -399,6 +428,30 @@ async def request(
resp = await self.send(req)
return self.handle_response(method, resp, br)

async def ws_request(
self,
method,
name=None,
namespace=None,
params: Optional[dict] = None,
raise_on_error: bool = False,
decode: Optional[str] = None,
) -> ExecResponse:
stdin = stdout = stderr = None
if "stdin" in params:
stdin = params["stdin"]
params["stdin"] = True
if "stdout" in params:
stdout = params["stdout"]
params["stdout"] = True
if "stderr" in params:
stderr = params["stderr"]
params["stderr"] = True
br = self.prepare_request(method, core_v1_res.Pod, None, name, namespace, params=params)
return await AsyncWebsocketDriver(self._client, br).write_and_read(
stdin=stdin, stdout=stdout, stderr=stderr, raise_on_error=raise_on_error, decode=decode
)

async def list_chunks(self, br: BasicRequest) -> AsyncIterator[Tuple[str, Iterator]]:
cont = True
while cont:
Expand Down
13 changes: 13 additions & 0 deletions src/lightkube/core/internal_models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import sys

from . import resource as res

try:
from ..models import autoscaling_v1, core_v1, meta_v1
from ..resources import core_v1 as core_v1_res
except ImportError:
if sys.modules["__main__"].__package__ != "mkdocs": # we ignore this import error during documentation generation
raise
Expand All @@ -24,3 +27,13 @@ class ResourceRequirements:

core_v1 = mock.Mock()
core_v1.ResourceRequirements = ResourceRequirements

class Pod(res.NamespacedResourceG):
_api_info = res.ApiInfo(
resource=res.ResourceDef("", "v1", "Pod"),
plural="pods",
verbs=[],
)

core_v1_res = mock.Mock()
core_v1_res.Pod = Pod
Loading
Loading