Skip to content

Commit 4049cc9

Browse files
authored
feat(server): refactor k8s client and add limiter (alibaba#429)
1 parent c02250c commit 4049cc9

18 files changed

Lines changed: 1507 additions & 903 deletions

server/src/config.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,38 @@ class KubernetesRuntimeConfig(BaseModel):
216216
"[Beta] Watch timeout (seconds) before restarting the informer stream."
217217
),
218218
)
219+
read_qps: float = Field(
220+
default=0.0,
221+
ge=0,
222+
description=(
223+
"Maximum read requests per second to the Kubernetes API (get/list). "
224+
"0 means unlimited (no rate limiting)."
225+
),
226+
)
227+
read_burst: int = Field(
228+
default=0,
229+
ge=0,
230+
description=(
231+
"Burst size for the read rate limiter. "
232+
"0 means use read_qps as burst (minimum 1)."
233+
),
234+
)
235+
write_qps: float = Field(
236+
default=0.0,
237+
ge=0,
238+
description=(
239+
"Maximum write requests per second to the Kubernetes API (create/delete/patch). "
240+
"0 means unlimited (no rate limiting)."
241+
),
242+
)
243+
write_burst: int = Field(
244+
default=0,
245+
ge=0,
246+
description=(
247+
"Burst size for the write rate limiter. "
248+
"0 means use write_qps as burst (minimum 1)."
249+
),
250+
)
219251
namespace: Optional[str] = Field(
220252
default=None,
221253
description="Namespace used for sandbox workloads.",

server/src/services/k8s/agent_sandbox_provider.py

Lines changed: 45 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,16 @@
2020
import logging
2121
import re
2222
from datetime import datetime
23-
from typing import Dict, List, Any, Optional, Callable
24-
from threading import Lock
23+
from typing import Dict, List, Any, Optional
2524

2625
from kubernetes.client import (
2726
V1Container,
2827
V1EnvVar,
2928
V1ResourceRequirements,
3029
V1VolumeMount,
31-
ApiException,
3230
)
3331

34-
from src.config import AppConfig, IngressConfig, ExecdInitResources
32+
from src.config import AppConfig
3533
from src.services.helpers import format_ingress_endpoint
3634
from src.api.schema import Endpoint, ImageSpec, NetworkPolicy, Volume
3735
from src.services.k8s.agent_sandbox_template import AgentSandboxTemplateManager
@@ -42,7 +40,6 @@
4240
build_security_context_from_dict,
4341
serialize_security_context_to_dict,
4442
)
45-
from src.services.k8s.informer import WorkloadInformer
4643
from src.services.k8s.volume_helper import apply_volumes_to_pod_spec
4744
from src.services.k8s.workload_provider import WorkloadProvider
4845
from src.services.runtime_resolver import SecureRuntimeResolver
@@ -83,44 +80,24 @@ class AgentSandboxProvider(WorkloadProvider):
8380
def __init__(
8481
self,
8582
k8s_client: K8sClient,
86-
template_file_path: Optional[str] = None,
87-
shutdown_policy: str = "Delete",
88-
service_account: Optional[str] = None,
89-
ingress_config: Optional[IngressConfig] = None,
90-
enable_informer: bool = True,
91-
informer_factory: Optional[Callable[[str], WorkloadInformer]] = None,
92-
informer_resync_seconds: int = 300,
93-
informer_watch_timeout_seconds: int = 60,
9483
app_config: Optional[AppConfig] = None,
95-
execd_init_resources: Optional[ExecdInitResources] = None,
9684
):
9785
self.k8s_client = k8s_client
98-
self.custom_api = k8s_client.get_custom_objects_api()
99-
self.core_api = k8s_client.get_core_v1_api()
10086

10187
self.group = "agents.x-k8s.io"
10288
self.version = "v1alpha1"
10389
self.plural = "sandboxes"
10490

105-
self.shutdown_policy = shutdown_policy
106-
self.service_account = service_account
107-
self.template_manager = AgentSandboxTemplateManager(template_file_path)
108-
self.ingress_config = ingress_config
109-
self.execd_init_resources = execd_init_resources
110-
self._enable_informer = enable_informer
111-
self._informer_factory = informer_factory or (
112-
lambda ns: WorkloadInformer(
113-
custom_api=self.custom_api,
114-
group=self.group,
115-
version=self.version,
116-
plural=self.plural,
117-
namespace=ns,
118-
resync_period_seconds=informer_resync_seconds,
119-
watch_timeout_seconds=informer_watch_timeout_seconds,
120-
)
91+
k8s_config = app_config.kubernetes if app_config else None
92+
agent_config = app_config.agent_sandbox if app_config else None
93+
94+
self.shutdown_policy = agent_config.shutdown_policy if agent_config else "Delete"
95+
self.service_account = k8s_config.service_account if k8s_config else None
96+
self.template_manager = AgentSandboxTemplateManager(
97+
agent_config.template_file if agent_config else None
12198
)
122-
self._informers: Dict[str, WorkloadInformer] = {}
123-
self._informers_lock = Lock()
99+
self.ingress_config = app_config.ingress if app_config else None
100+
self.execd_init_resources = k8s_config.execd_init_resources if k8s_config else None
124101

125102
# Initialize secure runtime resolver
126103
self.resolver = SecureRuntimeResolver(app_config) if app_config else None
@@ -158,14 +135,7 @@ def create_workload(
158135
egress_image: Optional[str] = None,
159136
volumes: Optional[List[Volume]] = None,
160137
) -> Dict[str, Any]:
161-
# Pool mode does not support volumes
162-
if extensions and extensions.get("poolRef"):
163-
if volumes:
164-
raise ValueError(
165-
"Pool mode does not support volumes. "
166-
"Remove 'volumes' from request or use template mode."
167-
)
168-
138+
"""Create an agent-sandbox Sandbox CRD workload."""
169139
if self.runtime_class:
170140
logger.info(
171141
"Using Kubernetes RuntimeClass '%s' for sandbox %s",
@@ -215,21 +185,14 @@ def create_workload(
215185

216186
sandbox = self.template_manager.merge_with_runtime_values(runtime_manifest)
217187

218-
created = self.custom_api.create_namespaced_custom_object(
188+
created = self.k8s_client.create_custom_object(
219189
group=self.group,
220190
version=self.version,
221191
namespace=namespace,
222192
plural=self.plural,
223193
body=sandbox,
224194
)
225195

226-
informer = self._get_informer(namespace)
227-
if informer:
228-
try:
229-
informer.update_cache(created)
230-
except Exception as exc: # pragma: no cover - defensive
231-
logger.warning("Failed to update informer cache for %s: %s", sandbox_id, exc)
232-
233196
return {
234197
"name": created["metadata"]["name"],
235198
"uid": created["metadata"]["uid"],
@@ -245,6 +208,7 @@ def _build_pod_spec(
245208
network_policy: Optional[NetworkPolicy] = None,
246209
egress_image: Optional[str] = None,
247210
) -> Dict[str, Any]:
211+
"""Build pod spec dict for the Sandbox CRD."""
248212
init_container = self._build_execd_init_container(execd_image)
249213
main_container = self._build_main_container(
250214
image_spec=image_spec,
@@ -284,6 +248,7 @@ def _build_pod_spec(
284248
return pod_spec
285249

286250
def _build_execd_init_container(self, execd_image: str) -> V1Container:
251+
"""Build init container that copies execd binary to the shared volume."""
287252
script = (
288253
"cp ./execd /opt/opensandbox/bin/execd && "
289254
"cp ./bootstrap.sh /opt/opensandbox/bin/bootstrap.sh && "
@@ -359,6 +324,7 @@ def _build_main_container(
359324
)
360325

361326
def _container_to_dict(self, container: V1Container) -> Dict[str, Any]:
327+
"""Convert a V1Container object to a plain dict for CRD body."""
362328
result: Dict[str, Any] = {
363329
"name": container.name,
364330
"image": container.image,
@@ -388,70 +354,30 @@ def _container_to_dict(self, container: V1Container) -> Dict[str, Any]:
388354

389355
return result
390356

391-
def _get_informer(self, namespace: str) -> Optional[WorkloadInformer]:
392-
if not self._enable_informer:
393-
return None
394-
395-
with self._informers_lock:
396-
informer = self._informers.get(namespace)
397-
if informer is None:
398-
informer = self._informer_factory(namespace)
399-
self._informers[namespace] = informer
400-
try:
401-
informer.start()
402-
except Exception as exc: # pragma: no cover - defensive
403-
logger.warning(
404-
"Failed to start informer for namespace %s: %s", namespace, exc
405-
)
406-
self._informers.pop(namespace, None)
407-
return None
408-
return informer
409-
410357
def get_workload(self, sandbox_id: str, namespace: str) -> Optional[Dict[str, Any]]:
411-
informer = self._get_informer(namespace)
412-
cache_ready = informer.has_synced if informer else False
413-
358+
"""Get Sandbox CRD by sandbox ID, trying all candidate resource names."""
414359
candidates = self._resource_name_candidates(sandbox_id)
415360

416-
if informer and cache_ready:
417-
for name in candidates:
418-
cached = informer.get(name)
419-
if cached:
420-
return cached
421-
422-
if informer and not cache_ready:
423-
logger.warning(
424-
f"Informer cache not synced for namespace {namespace}; falling back to direct API get."
425-
)
426-
427361
for name in candidates:
428-
try:
429-
workload = self.custom_api.get_namespaced_custom_object(
430-
group=self.group,
431-
version=self.version,
432-
namespace=namespace,
433-
plural=self.plural,
434-
name=name,
435-
)
436-
if informer and workload:
437-
informer.update_cache(workload)
362+
workload = self.k8s_client.get_custom_object(
363+
group=self.group,
364+
version=self.version,
365+
namespace=namespace,
366+
plural=self.plural,
367+
name=name,
368+
)
369+
if workload:
438370
return workload
439-
except ApiException as e:
440-
if e.status != 404:
441-
logger.error(f"Unexpected error getting Sandbox for {sandbox_id}: {e}")
442-
raise
443-
except Exception as e:
444-
logger.error(f"Unexpected error getting Sandbox for {sandbox_id}: {e}")
445-
raise
446371

447372
return None
448373

449374
def delete_workload(self, sandbox_id: str, namespace: str) -> None:
375+
"""Delete the Sandbox CRD for the given sandbox ID."""
450376
sandbox = self.get_workload(sandbox_id, namespace)
451377
if not sandbox:
452378
raise Exception(f"Sandbox for sandbox {sandbox_id} not found")
453379

454-
self.custom_api.delete_namespaced_custom_object(
380+
self.k8s_client.delete_custom_object(
455381
group=self.group,
456382
version=self.version,
457383
namespace=namespace,
@@ -461,24 +387,17 @@ def delete_workload(self, sandbox_id: str, namespace: str) -> None:
461387
)
462388

463389
def list_workloads(self, namespace: str, label_selector: str) -> List[Dict[str, Any]]:
464-
try:
465-
sandbox_list = self.custom_api.list_namespaced_custom_object(
466-
group=self.group,
467-
version=self.version,
468-
namespace=namespace,
469-
plural=self.plural,
470-
label_selector=label_selector,
471-
)
472-
return sandbox_list.get("items", [])
473-
except ApiException as e:
474-
if e.status == 404:
475-
return []
476-
raise
477-
except Exception as e:
478-
logger.error(f"Unexpected error listing Sandboxes: {e}")
479-
raise
390+
"""List Sandbox CRDs matching the given label selector."""
391+
return self.k8s_client.list_custom_objects(
392+
group=self.group,
393+
version=self.version,
394+
namespace=namespace,
395+
plural=self.plural,
396+
label_selector=label_selector,
397+
)
480398

481399
def update_expiration(self, sandbox_id: str, namespace: str, expires_at: datetime) -> None:
400+
"""Patch the Sandbox CRD shutdownTime field."""
482401
sandbox = self.get_workload(sandbox_id, namespace)
483402
if not sandbox:
484403
raise Exception(f"Sandbox for sandbox {sandbox_id} not found")
@@ -489,7 +408,7 @@ def update_expiration(self, sandbox_id: str, namespace: str, expires_at: datetim
489408
}
490409
}
491410

492-
self.custom_api.patch_namespaced_custom_object(
411+
self.k8s_client.patch_custom_object(
493412
group=self.group,
494413
version=self.version,
495414
namespace=namespace,
@@ -499,6 +418,7 @@ def update_expiration(self, sandbox_id: str, namespace: str, expires_at: datetim
499418
)
500419

501420
def get_expiration(self, workload: Dict[str, Any]) -> Optional[datetime]:
421+
"""Parse shutdownTime from Sandbox CRD spec."""
502422
spec = workload.get("spec", {})
503423
shutdown_time_str = spec.get("shutdownTime")
504424

@@ -508,10 +428,11 @@ def get_expiration(self, workload: Dict[str, Any]) -> Optional[datetime]:
508428
try:
509429
return datetime.fromisoformat(shutdown_time_str.replace("Z", "+00:00"))
510430
except (ValueError, TypeError) as e:
511-
logger.warning(f"Invalid shutdownTime format: {shutdown_time_str}, error: {e}")
431+
logger.warning("Invalid shutdownTime format: %s, error: %s", shutdown_time_str, e)
512432
return None
513433

514434
def get_status(self, workload: Dict[str, Any]) -> Dict[str, Any]:
435+
"""Derive sandbox state from the Sandbox CRD status conditions."""
515436
status = workload.get("status", {})
516437
conditions = status.get("conditions", [])
517438

@@ -577,10 +498,10 @@ def _pod_state_from_selector(self, workload: Dict[str, Any]) -> Optional[tuple[s
577498
return None
578499

579500
try:
580-
pods = self.core_api.list_namespaced_pod(
501+
pods = self.k8s_client.list_pods(
581502
namespace=namespace,
582503
label_selector=selector,
583-
).items
504+
)
584505
except Exception:
585506
return None
586507

@@ -620,15 +541,15 @@ def get_endpoint_info(self, workload: Dict[str, Any], port: int, sandbox_id: str
620541
namespace = workload.get("metadata", {}).get("namespace")
621542
if selector and namespace:
622543
try:
623-
pods = self.core_api.list_namespaced_pod(
544+
pods = self.k8s_client.list_pods(
624545
namespace=namespace,
625546
label_selector=selector,
626-
).items
547+
)
627548
for pod in pods:
628549
if pod.status and pod.status.pod_ip and pod.status.phase == "Running":
629550
return Endpoint(endpoint=f"{pod.status.pod_ip}:{port}")
630551
except Exception as e:
631-
logger.warning(f"Failed to resolve pod endpoint: {e}")
552+
logger.warning("Failed to resolve pod endpoint: %s", e)
632553

633554
service_fqdn = status.get("serviceFQDN")
634555
if service_fqdn:

0 commit comments

Comments
 (0)