diff --git a/.github/workflows/docker-build.yml b/.github/workflows/docker-build.yml index a36e46484..78d3c2e63 100644 --- a/.github/workflows/docker-build.yml +++ b/.github/workflows/docker-build.yml @@ -114,6 +114,9 @@ jobs: - name: openspiel-env dockerfile: envs/openspiel_env/server/Dockerfile context: envs/openspiel_env + - name: cloud-sre-env + dockerfile: envs/cloud_sre_env/server/Dockerfile + context: . steps: - name: Checkout code diff --git a/envs/cloud_sre_env/README.md b/envs/cloud_sre_env/README.md new file mode 100644 index 000000000..8925a10fb --- /dev/null +++ b/envs/cloud_sre_env/README.md @@ -0,0 +1,138 @@ +# Cloud SRE & FinOps Environment + +An [OpenEnv](https://github.com/meta-pytorch/OpenEnv)-compliant environment for training and evaluating AI agents on **Cloud Site Reliability Engineering (SRE)** and **Financial Operations (FinOps)** tasks. + +The agent manages a simulated cloud infrastructure: diagnosing outages, terminating idle resources, scaling services, and optimizing costs — all without causing collateral damage to production workloads. + +## Features + +| Feature | Description | +|---|---| +| **3 Difficulty Tiers** | Easy → Medium → Hard tasks covering cost optimization, scaling, and incident response | +| **Deterministic Grading** | Fine-grained scoring breakdowns for agent debugging | +| **Seeded Procedural Gen** | Reproducible yet varied infrastructure layouts for RL training | +| **Chaos Injection** | Random cost spikes, CPU anomalies, and spurious alerts | +| **Zero Dependencies** | No external API keys or cloud accounts needed | + +## Tasks + +### 1. Phantom Volume Cleanup (Easy) +- **Goal:** Find and terminate unattached EBS volumes wasting money +- **Trap:** Do NOT touch running EC2 instances or in-use volumes +- **Scoring:** +1/N per orphan terminated, −0.5 per active resource destroyed + +### 2. Latency Spike Remediation (Medium) +- **Goal:** Scale an under-provisioned RDS to fix high API latency +- **Trap:** Stay within the budget limit +- **Scoring:** 40% RDS scaled + 30% latency resolved + 30% under budget + +### 3. Noisy Neighbor Incident (Hard) +- **Goal:** Investigate a rogue test instance, terminate it, reboot crashed production +- **Trap:** Don't terminate production infrastructure +- **Scoring:** 20% inspect + 30% terminate rogue + 30% reboot backend + 20% alerts resolved + +## Quick Start + +### From Docker +```bash +# Build base image +docker build -t openenv-base:latest -f src/openenv/core/containers/images/Dockerfile . + +# Build Cloud SRE environment +docker build -t cloud-sre-env:latest -f envs/cloud_sre_env/server/Dockerfile . +``` + +### Using the Client +```python +from envs.cloud_sre_env import SREAction, CloudSREEnv + +# Connect to the Docker container +client = CloudSREEnv.from_docker_image("cloud-sre-env:latest") + +# Reset to a task +result = client.reset() + +# Take actions +result = client.step(SREAction(command="inspect", resource_id="ec2-web-001")) +result = client.step(SREAction(command="terminate", resource_id="ebs-orphan-001")) + +# Get state +state = client.state() +print(f"Step: {state.current_step}, Reward: {state.cumulative_reward}") + +# Cleanup +client.close() +``` + +### Direct Python Usage (no Docker) +```python +from cloud_sre_env.server.cloud_sre_environment import CloudSREEnvironment +from cloud_sre_env.models import SREAction + +env = CloudSREEnvironment() + +# Reset with seeded procedural generation +obs = env.reset(task_id="phantom_volume_cleanup", seed=42) +print(f"Resources: {len(obs.resources)}, Alerts: {len(obs.alerts)}") + +# Agent loop +for step in range(15): + action = SREAction(command="terminate", resource_id="ebs-orphan-001") + obs = env.step(action) + if env.state.done: + break + +# Grade the agent +score, breakdown = env.grade() +print(f"Score: {score}, Breakdown: {breakdown}") +``` + +## Action Space + +| Command | Description | Parameters | +|---------|-------------|------------| +| `terminate` | Remove a resource permanently | `resource_id` | +| `scale` | Change instance size | `resource_id`, `params.target_size` | +| `reboot` | Restart a stopped/running instance | `resource_id` | +| `inspect` | View detailed resource info (no side-effects) | `resource_id` | +| `wait` | Do nothing for this step | — | + +## Observation Space + +```json +{ + "resources": [{"id": "ec2-web-001", "type": "ec2_instance", "status": "running", ...}], + "alerts": [{"alert_id": "alert-cost-001", "severity": "warning", "message": "..."}], + "total_hourly_cost": 4.52, + "system_uptime": 78.0, + "budget_limit": 12.00, + "task_description": "Your cloud account has unattached EBS volumes..." +} +``` + +## Project Structure + +``` +cloud_sre_env/ +├── __init__.py # Exports SREAction, SREObservation, SREState, CloudSREEnv +├── models.py # Typed dataclass models (Action, Observation, State) +├── client.py # EnvClient subclass for HTTP/Docker communication +├── openenv.yaml # Environment manifest +├── README.md # This file +└── server/ + ├── __init__.py + ├── cloud_sre_environment.py # Core environment logic + 3 tasks + grading + ├── app.py # FastAPI application + ├── Dockerfile # Container image + └── requirements.txt # Server dependencies +``` + +## Testing + +```bash +pytest tests/test_cloud_sre_environment.py -v +``` + +## License + +BSD-3-Clause — same as OpenEnv. diff --git a/envs/cloud_sre_env/__init__.py b/envs/cloud_sre_env/__init__.py new file mode 100644 index 000000000..0c325313c --- /dev/null +++ b/envs/cloud_sre_env/__init__.py @@ -0,0 +1,12 @@ +""" +Cloud SRE & FinOps Environment for OpenEnv. + +An OpenEnv-compliant environment simulating Cloud SRE operations: +diagnosing outages, terminating idle resources, scaling services, +and optimizing costs without causing collateral damage. +""" + +from .models import SREAction, SREObservation, SREState +from .client import CloudSREEnv + +__all__ = ["SREAction", "SREObservation", "SREState", "CloudSREEnv"] diff --git a/envs/cloud_sre_env/client.py b/envs/cloud_sre_env/client.py new file mode 100644 index 000000000..cdda41b6f --- /dev/null +++ b/envs/cloud_sre_env/client.py @@ -0,0 +1,65 @@ +""" +Cloud SRE OpenEnv Client. + +Implements EnvClient for communicating with the Cloud SRE environment +via WebSocket/HTTP when deployed as a Docker container or HF Space. +""" + +from openenv.core import EnvClient, StepResult +from .models import SREAction, SREObservation, SREState + + +class CloudSREEnv(EnvClient[SREAction, SREObservation, SREState]): + """ + Client for the Cloud SRE & FinOps environment. + + Usage (async): + async with CloudSREEnv(base_url="https://your-space.hf.space") as client: + result = await client.reset() + result = await client.step(SREAction(command="inspect", resource_id="ec2-web-001")) + + Usage (sync): + with CloudSREEnv(base_url="...").sync() as client: + result = client.reset() + result = client.step(SREAction(command="terminate", resource_id="ebs-orphan-001")) + """ + + def _step_payload(self, action: SREAction) -> dict: + """Serialize an SREAction into the JSON payload for the server.""" + payload = {"command": action.command} + if action.resource_id: + payload["resource_id"] = action.resource_id + if action.params: + payload["params"] = action.params + return payload + + def _parse_result(self, payload: dict) -> StepResult[SREObservation]: + """Deserialize the server response into a typed StepResult.""" + obs_data = payload.get("observation", {}) + obs = SREObservation( + resources=obs_data.get("resources", []), + alerts=obs_data.get("alerts", []), + total_hourly_cost=obs_data.get("total_hourly_cost", 0.0), + system_uptime=obs_data.get("system_uptime", 100.0), + step_number=obs_data.get("step_number", 0), + max_steps=obs_data.get("max_steps", 15), + budget_limit=obs_data.get("budget_limit"), + task_description=obs_data.get("task_description", ""), + ) + return StepResult( + observation=obs, + reward=payload.get("reward", 0.0), + done=payload.get("done", False), + ) + + def _parse_state(self, payload: dict) -> SREState: + """Deserialize the server state response into a typed SREState.""" + return SREState( + episode_id=payload.get("episode_id", ""), + step_count=payload.get("step_count", 0), + task_id=payload.get("task_id", ""), + current_step=payload.get("current_step", 0), + done=payload.get("done", False), + cumulative_reward=payload.get("cumulative_reward", 0.0), + action_count=payload.get("action_count", 0), + ) diff --git a/envs/cloud_sre_env/models.py b/envs/cloud_sre_env/models.py new file mode 100644 index 000000000..c59ff76b5 --- /dev/null +++ b/envs/cloud_sre_env/models.py @@ -0,0 +1,124 @@ +""" +Typed models for the Cloud SRE OpenEnv environment. + +Follows OpenEnv conventions: Action, Observation, State as dataclasses +inheriting from openenv.core base classes. +""" + +from dataclasses import dataclass, field +from typing import Dict, List, Optional +from enum import Enum + +from openenv.core.env_server import Action, Observation, State + + +# ─── Enums ──────────────────────────────────────────────────────────────────── + + +class ResourceType(str, Enum): + """Types of cloud resources available in the simulation.""" + EC2 = "ec2_instance" + RDS = "rds_database" + EBS = "ebs_volume" + ALB = "alb_load_balancer" + + +class ResourceStatus(str, Enum): + """Possible statuses for a cloud resource.""" + RUNNING = "running" + STOPPED = "stopped" + AVAILABLE = "available" # EBS: unattached + IN_USE = "in-use" # EBS: attached + REBOOTING = "rebooting" + TERMINATED = "terminated" + + +class AlertSeverity(str, Enum): + """Severity levels for monitoring alerts.""" + INFO = "info" + WARNING = "warning" + CRITICAL = "critical" + + +class ActionCommand(str, Enum): + """Discrete commands the agent can issue.""" + TERMINATE = "terminate" + SCALE = "scale" + REBOOT = "reboot" + INSPECT = "inspect" + WAIT = "wait" + + +# ─── Data Structures ───────────────────────────────────────────────────────── + + +@dataclass +class ResourceInfo: + """Represents a single cloud resource (EC2, RDS, EBS, ALB).""" + id: str + name: str = "" + type: str = "" # ResourceType value + status: str = "" # ResourceStatus value + instance_size: str = "" # e.g., "t3.micro", "db.t3.medium" + cpu_utilization: float = 0.0 + memory_utilization: float = 0.0 + cost_per_hour: float = 0.0 + attached_to: Optional[str] = None + tags: Dict[str, str] = field(default_factory=dict) + + +@dataclass +class AlertInfo: + """Represents an active monitoring alert.""" + alert_id: str + severity: str = "info" + message: str = "" + resource_id: Optional[str] = None + metric_name: Optional[str] = None + metric_value: Optional[float] = None + + +# ─── OpenEnv Action / Observation / State ──────────────────────────────────── + + +from pydantic import Field + +class SREAction(Action): + """ + An action the agent submits to step(). + + Attributes: + command: One of 'terminate', 'scale', 'reboot', 'inspect', 'wait'. + resource_id: The target resource ID (optional for 'wait'). + params: Additional parameters, e.g. {"target_size": "db.t3.medium"}. + """ + command: str = "wait" + resource_id: Optional[str] = None + params: Dict[str, str] = Field(default_factory=dict) + + +class SREObservation(Observation): + """ + The full observation returned by state() and step(). + Contains everything the agent can see about the infrastructure. + """ + resources: List[dict] = Field(default_factory=list) + alerts: List[dict] = Field(default_factory=list) + total_hourly_cost: float = 0.0 + system_uptime: float = 100.0 + step_number: int = 0 + max_steps: int = 15 + budget_limit: Optional[float] = None + task_description: str = "" + + +class SREState(State): + """ + Episode state tracking for the Cloud SRE environment. + Extends the base State with SRE-specific metadata. + """ + task_id: str = "" + current_step: int = 0 + done: bool = False + cumulative_reward: float = 0.0 + action_count: int = 0 diff --git a/envs/cloud_sre_env/openenv.yaml b/envs/cloud_sre_env/openenv.yaml new file mode 100644 index 000000000..97801c008 --- /dev/null +++ b/envs/cloud_sre_env/openenv.yaml @@ -0,0 +1,63 @@ +name: "cloud-sre-env" +version: "2.0.0" +description: > + An OpenEnv environment simulating Cloud SRE & FinOps operations. + An AI agent manages a simulated cloud infrastructure: diagnosing outages, + terminating idle resources, scaling services, and optimizing costs + without causing collateral damage to production workloads. + + Features: + - 3 difficulty-tiered tasks (easy → hard) with deterministic grading + - Seeded procedural generation for reproducible RL training + - Chaos event injection for robustness testing + - Fine-grained scoring breakdowns for agent debugging + +author: "naveenkumar982" +license: "BSD-3-Clause" + +entrypoint: "cloud_sre_env.server.cloud_sre_environment:CloudSREEnvironment" + +tasks: + - id: "phantom_volume_cleanup" + name: "Phantom Volume Cleanup" + difficulty: "easy" + description: "Identify and terminate unattached, idle EBS volumes wasting money without touching active resources." + + - id: "latency_spike_remediation" + name: "Latency Spike Remediation" + difficulty: "medium" + description: "Scale up an under-provisioned RDS database to fix high API latency, within a budget constraint." + + - id: "noisy_neighbor_incident" + name: "Noisy Neighbor Incident" + difficulty: "hard" + description: "Identify a rogue test EC2 instance, terminate it, and reboot the crashed production backend." + +action_space: + type: "discrete" + commands: + - "terminate" + - "scale" + - "reboot" + - "inspect" + - "wait" + +observation_space: + type: "structured" + fields: + - "resources: List[ResourceInfo]" + - "alerts: List[AlertInfo]" + - "total_hourly_cost: float" + - "system_uptime: float (0-100)" + - "budget_limit: Optional[float]" + - "task_description: str" + +reward_range: [-1.0, 1.0] +max_steps: 15 + +dependencies: + - "openenv-core>=0.2.0" + +deployment: + platform: "huggingface-spaces" + dockerfile: "server/Dockerfile" diff --git a/envs/cloud_sre_env/server/Dockerfile b/envs/cloud_sre_env/server/Dockerfile new file mode 100644 index 000000000..b8d984da3 --- /dev/null +++ b/envs/cloud_sre_env/server/Dockerfile @@ -0,0 +1,18 @@ +# Accept base image as build argument for CI/CD flexibility +ARG BASE_IMAGE=openenv-base:latest +FROM ${BASE_IMAGE} + +# Install dependencies +COPY envs/cloud_sre_env/server/requirements.txt /tmp/requirements.txt +RUN pip install --no-cache-dir -r /tmp/requirements.txt && rm /tmp/requirements.txt + +# Copy environment code +COPY src/openenv/core/ /app/src/openenv/core/ +COPY envs/cloud_sre_env/ /app/envs/cloud_sre_env/ + +# Health check +HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 + +# Run server +CMD ["uvicorn", "envs.cloud_sre_env.server.app:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/envs/cloud_sre_env/server/__init__.py b/envs/cloud_sre_env/server/__init__.py new file mode 100644 index 000000000..2c6b67d0c --- /dev/null +++ b/envs/cloud_sre_env/server/__init__.py @@ -0,0 +1 @@ +# server package diff --git a/envs/cloud_sre_env/server/app.py b/envs/cloud_sre_env/server/app.py new file mode 100644 index 000000000..c88e3a0d9 --- /dev/null +++ b/envs/cloud_sre_env/server/app.py @@ -0,0 +1,11 @@ +""" +FastAPI application for the Cloud SRE Environment. +Uses the OpenEnv create_fastapi_app helper. +""" + +from openenv.core.env_server import create_fastapi_app +from ..models import SREAction, SREObservation +from .cloud_sre_environment import CloudSREEnvironment + +env = CloudSREEnvironment() +app = create_fastapi_app(env, SREAction, SREObservation) diff --git a/envs/cloud_sre_env/server/cloud_sre_environment.py b/envs/cloud_sre_env/server/cloud_sre_environment.py new file mode 100644 index 000000000..cb130a196 --- /dev/null +++ b/envs/cloud_sre_env/server/cloud_sre_environment.py @@ -0,0 +1,790 @@ +""" +Cloud SRE Environment — Server-Side Implementation. + +Implements the core Environment with reset(), step(), and state() methods +following the OpenEnv specification. + +Supports: +- 3 difficulty-tiered Cloud SRE tasks (easy, medium, hard) +- Seeded procedural generation for RL training +- Chaos event injection for robustness testing +- Deterministic grading with fine-grained scoring breakdowns +""" + +import copy +import random +import uuid +from typing import Dict, Any, List, Optional, Tuple, Set +from dataclasses import asdict + +from openenv.core.env_server import Environment +from ..models import ( + SREAction, SREObservation, SREState, + ResourceType, ResourceStatus, AlertSeverity, ActionCommand, + ResourceInfo, AlertInfo, +) + + +# ── Helper: Name generators ──────────────────────────────────────────────── + +_ADJECTIVES = ["old", "legacy", "temp", "stale", "orphan", "unused", "leftover", "backup", "scratch", "test"] +_EC2_ROLES = ["web", "api", "worker", "cache", "monitor", "proxy", "gateway", "scheduler", "indexer", "renderer"] +_EBS_NOTES = ["migration-2024", "snapshot-leftover", "backup-failed", "scratch-disk", "dev-test", "canary-old"] +_ENVS_DECOY = ["deprecated", "test", "dev", "staging", "sandbox"] +_SIZES_EC2 = ["t3.nano", "t3.micro", "t3.small", "t3.medium", "t3.large", "t3.xlarge"] +_SIZES_LARGE = ["c5.xlarge", "c5.2xlarge", "c5.4xlarge", "m5.xlarge", "m5.2xlarge"] + +RDS_PRICING = { + "db.t3.micro": 0.017, "db.t3.small": 0.034, + "db.t3.medium": 0.068, "db.t3.large": 0.136, + "db.t3.xlarge": 0.272, +} + +CHAOS_EVENTS = [ + { + "type": "new_alert", + "alert": { + "alert_id": "chaos-cost-spike", + "severity": "warning", + "message": "Unexpected S3 egress cost spike detected: +$0.45/hr from cross-region transfers.", + "metric_name": "S3EgressCost", + "metric_value": 0.45, + }, + }, + { + "type": "cpu_spike", + "description": "A random running instance's CPU spikes to 92%", + }, + { + "type": "new_alert", + "alert": { + "alert_id": "chaos-disk-warn", + "severity": "warning", + "message": "Disk utilization on a data volume has reached 85%. Consider expanding storage.", + "metric_name": "DiskUtilization", + "metric_value": 85.0, + }, + }, + { + "type": "cost_drift", + "description": "Total cost drifts up slightly due to network egress charges", + }, +] + + +def _rand_id(rng: random.Random, prefix: str, width: int = 3) -> str: + return f"{prefix}-{rng.randint(10**(width-1), 10**width - 1)}" + + +# ═══════════════════════════════════════════════════════════════════════════════ +# TASK DEFINITIONS +# ═══════════════════════════════════════════════════════════════════════════════ + + +class Task1PhantomVolumeCleanup: + """ + EASY: Identify and terminate unattached EBS volumes wasting money. + Agent must NOT touch running instances or in-use volumes. + """ + TASK_ID = "phantom_volume_cleanup" + DIFFICULTY = "easy" + DESCRIPTION = ( + "Your cloud account has unattached EBS volumes that are not connected " + "to any instance but still incur charges. Identify and terminate them " + "to reduce costs. Do NOT touch any running instances or in-use volumes." + ) + + def __init__(self): + self.orphan_ids: Set[str] = set() + + def get_initial_state(self, seed: Optional[int] = None) -> Dict[str, Any]: + if seed is None: + self.orphan_ids = {"ebs-orphan-001", "ebs-orphan-002", "ebs-orphan-003"} + return self._get_fixed_state() + + rng = random.Random(seed) + num_orphans = rng.randint(2, 5) + num_ec2 = rng.randint(3, 6) + num_inuse_ebs = rng.randint(1, 3) + + resources = [] + + # Active EC2 instances (decoys) + ec2_ids = [] + for i in range(num_ec2): + role = rng.choice(_EC2_ROLES) + size = rng.choice(_SIZES_EC2) + eid = _rand_id(rng, f"ec2-{role}") + ec2_ids.append(eid) + resources.append(asdict(ResourceInfo( + id=eid, name=f"{role}-server-{i+1}", + type=ResourceType.EC2.value, status=ResourceStatus.RUNNING.value, + instance_size=size, + cpu_utilization=round(rng.uniform(10, 75), 1), + memory_utilization=round(rng.uniform(20, 80), 1), + cost_per_hour=round(rng.uniform(0.02, 0.12), 4), + tags={"env": "prod", "role": role}, + ))) + + # In-use EBS volumes (decoys) + for i in range(num_inuse_ebs): + attached = rng.choice(ec2_ids) if ec2_ids else "ec2-unknown" + resources.append(asdict(ResourceInfo( + id=_rand_id(rng, "ebs-data"), name=f"data-vol-{i+1}", + type=ResourceType.EBS.value, status=ResourceStatus.IN_USE.value, + cost_per_hour=round(rng.uniform(0.05, 0.15), 4), + attached_to=attached, tags={"env": "prod"}, + ))) + + # ORPHAN EBS volumes (targets) + for i in range(num_orphans): + oid = _rand_id(rng, "ebs-orphan") + self.orphan_ids.add(oid) + resources.append(asdict(ResourceInfo( + id=oid, name=f"{rng.choice(_ADJECTIVES)}-vol-{i+1}", + type=ResourceType.EBS.value, status=ResourceStatus.AVAILABLE.value, + cost_per_hour=round(rng.uniform(0.50, 2.50), 2), + attached_to=None, + tags={"env": rng.choice(_ENVS_DECOY), "note": rng.choice(_EBS_NOTES)}, + ))) + + rng.shuffle(resources) + total_waste = sum(r["cost_per_hour"] for r in resources + if r["type"] == ResourceType.EBS.value and r["status"] == ResourceStatus.AVAILABLE.value) + total_cost = sum(r["cost_per_hour"] for r in resources) + + alerts = [asdict(AlertInfo( + alert_id="alert-cost-001", severity=AlertSeverity.WARNING.value, + message=f"Monthly cost projection exceeds budget. " + f"{num_orphans} unattached EBS volumes detected (${total_waste:.2f}/hr waste).", + metric_name="CostAnomaly", metric_value=round(total_waste, 2), + ))] + + return { + "resources": resources, "alerts": alerts, + "total_hourly_cost": round(total_cost, 4), + "system_uptime": 100.0, "budget_limit": None, + } + + def _get_fixed_state(self) -> Dict[str, Any]: + resources = [ + asdict(ResourceInfo(id="ec2-web-001", name="web-server-1", type=ResourceType.EC2.value, + status=ResourceStatus.RUNNING.value, instance_size="t3.medium", + cpu_utilization=45.0, memory_utilization=62.0, + cost_per_hour=0.0416, tags={"env": "prod", "role": "web"})), + asdict(ResourceInfo(id="ec2-web-002", name="web-server-2", type=ResourceType.EC2.value, + status=ResourceStatus.RUNNING.value, instance_size="t3.medium", + cpu_utilization=38.0, memory_utilization=55.0, + cost_per_hour=0.0416, tags={"env": "prod", "role": "web"})), + asdict(ResourceInfo(id="ec2-api-001", name="api-server-1", type=ResourceType.EC2.value, + status=ResourceStatus.RUNNING.value, instance_size="t3.large", + cpu_utilization=60.0, memory_utilization=70.0, + cost_per_hour=0.0832, tags={"env": "prod", "role": "api"})), + asdict(ResourceInfo(id="ebs-data-001", name="web-data-vol", type=ResourceType.EBS.value, + status=ResourceStatus.IN_USE.value, cost_per_hour=0.10, + attached_to="ec2-web-001", tags={"env": "prod"})), + asdict(ResourceInfo(id="ebs-orphan-001", name="old-migration-vol", type=ResourceType.EBS.value, + status=ResourceStatus.AVAILABLE.value, cost_per_hour=1.40, + tags={"env": "deprecated", "note": "migration-2024"})), + asdict(ResourceInfo(id="ebs-orphan-002", name="test-snapshot-vol", type=ResourceType.EBS.value, + status=ResourceStatus.AVAILABLE.value, cost_per_hour=1.40, + tags={"env": "test", "note": "snapshot-leftover"})), + asdict(ResourceInfo(id="ebs-orphan-003", name="dev-scratch-vol", type=ResourceType.EBS.value, + status=ResourceStatus.AVAILABLE.value, cost_per_hour=1.40, + tags={"env": "dev", "note": "scratch-disk"})), + ] + alerts = [asdict(AlertInfo( + alert_id="alert-cost-001", severity=AlertSeverity.WARNING.value, + message="Monthly cost projection exceeds budget by 15%. " + "3 unattached EBS volumes detected ($4.20/hr waste).", + metric_name="CostAnomaly", metric_value=4.20, + ))] + total_cost = sum(r["cost_per_hour"] for r in resources) + return { + "resources": resources, "alerts": alerts, + "total_hourly_cost": round(total_cost, 4), + "system_uptime": 100.0, "budget_limit": None, + } + + def grade(self, action_history: List[Dict], final_state: Dict, initial_state: Dict) -> Tuple[float, Dict]: + score = 0.0 + breakdown = { + "orphans_terminated": [], "active_resources_terminated": [], + "total_orphans": len(self.orphan_ids), "penalties": 0.0, + } + terminated_ids = {e["resource_id"] for e in action_history + if e.get("command") == "terminate" and e.get("resource_id")} + per_orphan = 1.0 / len(self.orphan_ids) if self.orphan_ids else 0.33 + for rid in terminated_ids: + if rid in self.orphan_ids: + score += per_orphan + breakdown["orphans_terminated"].append(rid) + else: + score -= 0.50 + breakdown["active_resources_terminated"].append(rid) + breakdown["penalties"] += 0.50 + + score = round(max(0.0, min(1.0, score)), 2) + if (len(breakdown["orphans_terminated"]) == len(self.orphan_ids) + and not breakdown["active_resources_terminated"]): + score = 1.0 + return score, breakdown + + +class Task2LatencySpikeRemediation: + """ + MEDIUM: Scale an under-provisioned RDS to fix high API latency, within budget. + """ + TASK_ID = "latency_spike_remediation" + DIFFICULTY = "medium" + DESCRIPTION = ( + "A critical latency alert has fired. Your RDS database is running on a " + "tiny instance and cannot handle the current load. Scale it up to fix " + "latency, but keep total hourly cost under the budget limit." + ) + BUDGET_LIMIT = 12.00 + + def __init__(self): + self.rds_id: str = "rds-primary-001" + + def get_initial_state(self, seed: Optional[int] = None) -> Dict[str, Any]: + if seed is None: + self.rds_id = "rds-primary-001" + return self._get_fixed_state() + + rng = random.Random(seed) + num_ec2 = rng.randint(3, 6) + resources = [] + + for i in range(num_ec2): + role = rng.choice(["app", "cache", "monitor", "proxy", "gateway"]) + resources.append(asdict(ResourceInfo( + id=_rand_id(rng, f"ec2-{role}"), name=f"{role}-server-{i+1}", + type=ResourceType.EC2.value, status=ResourceStatus.RUNNING.value, + instance_size=rng.choice(_SIZES_EC2[2:]), + cpu_utilization=round(rng.uniform(25, 75), 1), + memory_utilization=round(rng.uniform(30, 80), 1), + cost_per_hour=round(rng.uniform(0.04, 0.15), 4), + tags={"env": "prod", "role": role}, + ))) + + rds_id = _rand_id(rng, "rds-primary") + self.rds_id = rds_id + rds_cpu = round(rng.uniform(92, 99), 1) + resources.append(asdict(ResourceInfo( + id=rds_id, name="primary-db", type=ResourceType.RDS.value, + status=ResourceStatus.RUNNING.value, instance_size="db.t3.micro", + cpu_utilization=rds_cpu, memory_utilization=round(rng.uniform(88, 98), 1), + cost_per_hour=0.017, tags={"env": "prod", "role": "database"}, + ))) + + resources.append(asdict(ResourceInfo( + id=_rand_id(rng, "alb-main"), name="main-load-balancer", + type=ResourceType.ALB.value, status=ResourceStatus.RUNNING.value, + cost_per_hour=0.0225, tags={"env": "prod", "role": "lb"}, + ))) + + rng.shuffle(resources) + latency_val = round(rng.uniform(1800, 3000), 0) + alerts = [ + asdict(AlertInfo(alert_id="alert-latency-001", severity=AlertSeverity.CRITICAL.value, + message=f"API p99 latency has exceeded {latency_val:.0f}ms. " + f"Root cause: RDS '{rds_id}' (db.t3.micro) at {rds_cpu}% CPU.", + resource_id=rds_id, metric_name="P99Latency", metric_value=latency_val)), + asdict(AlertInfo(alert_id="alert-cpu-001", severity=AlertSeverity.WARNING.value, + message=f"RDS '{rds_id}' CPU utilization is at {rds_cpu}%.", + resource_id=rds_id, metric_name="CPUUtilization", metric_value=rds_cpu)), + ] + total_cost = sum(r["cost_per_hour"] for r in resources) + budget = round(total_cost + rng.uniform(3.0, 8.0), 2) + return { + "resources": resources, "alerts": alerts, + "total_hourly_cost": round(total_cost, 4), + "system_uptime": round(rng.uniform(65, 82), 1), + "budget_limit": budget, + } + + def _get_fixed_state(self) -> Dict[str, Any]: + resources = [ + asdict(ResourceInfo(id="ec2-app-001", name="app-server-1", type=ResourceType.EC2.value, + status=ResourceStatus.RUNNING.value, instance_size="t3.large", + cpu_utilization=72.0, memory_utilization=65.0, + cost_per_hour=0.0832, tags={"env": "prod", "role": "app"})), + asdict(ResourceInfo(id="ec2-app-002", name="app-server-2", type=ResourceType.EC2.value, + status=ResourceStatus.RUNNING.value, instance_size="t3.large", + cpu_utilization=68.0, memory_utilization=60.0, + cost_per_hour=0.0832, tags={"env": "prod", "role": "app"})), + asdict(ResourceInfo(id="rds-primary-001", name="primary-db", type=ResourceType.RDS.value, + status=ResourceStatus.RUNNING.value, instance_size="db.t3.micro", + cpu_utilization=98.0, memory_utilization=95.0, + cost_per_hour=0.017, tags={"env": "prod", "role": "database"})), + asdict(ResourceInfo(id="alb-main-001", name="main-load-balancer", type=ResourceType.ALB.value, + status=ResourceStatus.RUNNING.value, + cost_per_hour=0.0225, tags={"env": "prod", "role": "lb"})), + ] + alerts = [ + asdict(AlertInfo(alert_id="alert-latency-001", severity=AlertSeverity.CRITICAL.value, + message="API p99 latency has exceeded 2000ms. Root cause: RDS 'primary-db' at 98% CPU.", + resource_id="rds-primary-001", metric_name="P99Latency", metric_value=2150.0)), + ] + total_cost = sum(r["cost_per_hour"] for r in resources) + return { + "resources": resources, "alerts": alerts, + "total_hourly_cost": round(total_cost, 4), + "system_uptime": 78.0, "budget_limit": self.BUDGET_LIMIT, + } + + def grade(self, action_history: List[Dict], final_state: Dict, initial_state: Dict) -> Tuple[float, Dict]: + score = 0.0 + breakdown = {"rds_scaled": False, "under_budget": False, "alert_resolved": False, + "ec2s_terminated": [], "penalties": 0.0} + valid_sizes = ["db.t3.medium", "db.t3.large", "db.t3.xlarge"] + + for entry in action_history: + cmd, rid = entry.get("command"), entry.get("resource_id") + params = entry.get("params", {}) + if cmd == "scale" and rid == self.rds_id and params.get("target_size", "") in valid_sizes: + breakdown["rds_scaled"] = True + if cmd == "terminate" and rid and rid.startswith("ec2"): + breakdown["ec2s_terminated"].append(rid) + + if breakdown["rds_scaled"]: + score += 0.40 + breakdown["alert_resolved"] = True + score += 0.30 + budget = initial_state.get("budget_limit", self.BUDGET_LIMIT) + if final_state.get("total_hourly_cost", 999) <= budget: + breakdown["under_budget"] = True + score += 0.30 + for _ in breakdown["ec2s_terminated"]: + score -= 0.30 + breakdown["penalties"] += 0.30 + + return round(max(0.0, min(1.0, score)), 2), breakdown + + +class Task3NoisyNeighborIncident: + """ + HARD: A rogue test EC2 instance has crashed a prod backend. + Agent must investigate, terminate the rogue, and reboot production. + """ + TASK_ID = "noisy_neighbor_incident" + DIFFICULTY = "hard" + DESCRIPTION = ( + "CRITICAL INCIDENT: A rogue EC2 instance (tagged env:test) is consuming " + "excessive resources and has caused the production backend to crash. " + "Investigate, terminate the rogue, and restore the production backend." + ) + + def __init__(self): + self.rogue_id: str = "ec2-rogue-test-001" + self.backend_id: str = "ec2-backend-prod-001" + self.prod_ids: Set[str] = set() + + def get_initial_state(self, seed: Optional[int] = None) -> Dict[str, Any]: + if seed is None: + self.rogue_id = "ec2-rogue-test-001" + self.backend_id = "ec2-backend-prod-001" + self.prod_ids = {"ec2-frontend-001", "ec2-frontend-002", + "ec2-api-prod-001", "ec2-backend-prod-001", "rds-prod-001"} + return self._get_fixed_state() + + rng = random.Random(seed) + resources = [] + + # THE ROGUE INSTANCE + rogue_id = _rand_id(rng, "ec2-rogue-test") + self.rogue_id = rogue_id + resources.append(asdict(ResourceInfo( + id=rogue_id, name="load-test-runner", type=ResourceType.EC2.value, + status=ResourceStatus.RUNNING.value, instance_size=rng.choice(_SIZES_LARGE), + cpu_utilization=100.0, memory_utilization=round(rng.uniform(85, 98), 1), + cost_per_hour=round(rng.uniform(0.40, 1.20), 2), + tags={"env": "test", "role": "load-testing", "owner": "qa-team", "note": "forgot to stop"}, + ))) + + # CRASHED PROD BACKEND + backend_id = _rand_id(rng, "ec2-backend-prod") + self.backend_id = backend_id + self.prod_ids = {backend_id} + resources.append(asdict(ResourceInfo( + id=backend_id, name="backend-primary", type=ResourceType.EC2.value, + status=ResourceStatus.STOPPED.value, instance_size="m5.xlarge", + cost_per_hour=0.192, + tags={"env": "prod", "role": "backend", "critical": "true"}, + ))) + + # Normal prod instances + for i in range(rng.randint(3, 6)): + role = rng.choice(["frontend", "api", "db-proxy", "gateway", "worker"]) + pid = _rand_id(rng, f"ec2-{role}-prod") + self.prod_ids.add(pid) + resources.append(asdict(ResourceInfo( + id=pid, name=f"{role}-{i+1}", type=ResourceType.EC2.value, + status=ResourceStatus.RUNNING.value, + instance_size=rng.choice(_SIZES_EC2[2:]), + cpu_utilization=round(rng.uniform(20, 75), 1), + memory_utilization=round(rng.uniform(25, 70), 1), + cost_per_hour=round(rng.uniform(0.03, 0.10), 4), + tags={"env": "prod", "role": role}, + ))) + + rng.shuffle(resources) + rogue_cost = next(r["cost_per_hour"] for r in resources if r["id"] == rogue_id) + alerts = [ + asdict(AlertInfo(alert_id="alert-crit-001", severity=AlertSeverity.CRITICAL.value, + message=f"Production backend '{backend_id}' is DOWN. HTTP 503 errors spiking.", + resource_id=backend_id, metric_name="HealthCheck", metric_value=0.0)), + asdict(AlertInfo(alert_id="alert-crit-002", severity=AlertSeverity.CRITICAL.value, + message=f"Abnormal CPU usage: '{rogue_id}' consuming 100% CPU. Investigate immediately.", + resource_id=rogue_id, metric_name="CPUUtilization", metric_value=100.0)), + asdict(AlertInfo(alert_id="alert-cost-002", severity=AlertSeverity.WARNING.value, + message=f"Hourly cost spike: ${rogue_cost:.2f}/hr from a single test instance.", + resource_id=rogue_id, metric_name="CostAnomaly", metric_value=rogue_cost)), + ] + total_cost = sum(r["cost_per_hour"] for r in resources) + return { + "resources": resources, "alerts": alerts, + "total_hourly_cost": round(total_cost, 4), + "system_uptime": round(rng.uniform(25, 45), 1), "budget_limit": None, + } + + def _get_fixed_state(self) -> Dict[str, Any]: + resources = [ + asdict(ResourceInfo(id="ec2-rogue-test-001", name="load-test-runner", type=ResourceType.EC2.value, + status=ResourceStatus.RUNNING.value, instance_size="c5.4xlarge", + cpu_utilization=100.0, memory_utilization=92.0, cost_per_hour=0.68, + tags={"env": "test", "role": "load-testing", "owner": "qa-team"})), + asdict(ResourceInfo(id="ec2-backend-prod-001", name="backend-primary", type=ResourceType.EC2.value, + status=ResourceStatus.STOPPED.value, instance_size="m5.xlarge", + cost_per_hour=0.192, + tags={"env": "prod", "role": "backend", "critical": "true"})), + asdict(ResourceInfo(id="ec2-frontend-001", name="frontend-1", type=ResourceType.EC2.value, + status=ResourceStatus.RUNNING.value, instance_size="t3.medium", + cpu_utilization=55.0, memory_utilization=40.0, + cost_per_hour=0.0416, tags={"env": "prod", "role": "frontend"})), + asdict(ResourceInfo(id="ec2-api-prod-001", name="api-gateway", type=ResourceType.EC2.value, + status=ResourceStatus.RUNNING.value, instance_size="t3.large", + cpu_utilization=75.0, memory_utilization=60.0, + cost_per_hour=0.0832, tags={"env": "prod", "role": "api"})), + asdict(ResourceInfo(id="rds-prod-001", name="prod-database", type=ResourceType.RDS.value, + status=ResourceStatus.RUNNING.value, instance_size="db.r5.large", + cpu_utilization=40.0, memory_utilization=55.0, + cost_per_hour=0.24, tags={"env": "prod", "role": "database"})), + ] + alerts = [ + asdict(AlertInfo(alert_id="alert-crit-001", severity=AlertSeverity.CRITICAL.value, + message="Production backend 'backend-primary' is DOWN. HTTP 503 errors spiking.", + resource_id="ec2-backend-prod-001", metric_name="HealthCheck", metric_value=0.0)), + asdict(AlertInfo(alert_id="alert-crit-002", severity=AlertSeverity.CRITICAL.value, + message="Abnormal CPU usage: 'ec2-rogue-test-001' consuming 100% CPU.", + resource_id="ec2-rogue-test-001", metric_name="CPUUtilization", metric_value=100.0)), + ] + total_cost = sum(r["cost_per_hour"] for r in resources) + return { + "resources": resources, "alerts": alerts, + "total_hourly_cost": round(total_cost, 4), + "system_uptime": 35.0, "budget_limit": None, + } + + def grade(self, action_history: List[Dict], final_state: Dict, initial_state: Dict) -> Tuple[float, Dict]: + score = 0.0 + breakdown = {"inspected_rogue": False, "terminated_rogue": False, + "rebooted_backend": False, "alerts_resolved": False, + "prod_terminated": [], "penalties": 0.0} + + has_inspected_rogue = False + for entry in action_history: + cmd, rid = entry.get("command"), entry.get("resource_id") + if cmd == "inspect" and rid == self.rogue_id: + has_inspected_rogue = True + if cmd == "terminate" and rid == self.rogue_id: + breakdown["terminated_rogue"] = True + if has_inspected_rogue: + breakdown["inspected_rogue"] = True + if cmd == "reboot" and rid == self.backend_id: + breakdown["rebooted_backend"] = True + if cmd == "terminate" and rid in self.prod_ids: + breakdown["prod_terminated"].append(rid) + + if breakdown["inspected_rogue"]: + score += 0.20 + if breakdown["terminated_rogue"]: + score += 0.30 + if breakdown["rebooted_backend"]: + score += 0.30 + if breakdown["terminated_rogue"] and breakdown["rebooted_backend"]: + breakdown["alerts_resolved"] = True + score += 0.20 + for _ in breakdown["prod_terminated"]: + score -= 0.50 + breakdown["penalties"] += 0.50 + + return round(max(0.0, min(1.0, score)), 2), breakdown + + +TASK_REGISTRY = { + "phantom_volume_cleanup": Task1PhantomVolumeCleanup, + "latency_spike_remediation": Task2LatencySpikeRemediation, + "noisy_neighbor_incident": Task3NoisyNeighborIncident, +} + + +# ═══════════════════════════════════════════════════════════════════════════════ +# ENVIRONMENT +# ═══════════════════════════════════════════════════════════════════════════════ + + +class CloudSREEnvironment(Environment): + """ + OpenEnv-compliant Cloud SRE & FinOps Environment. + + The agent manages a simulated cloud infrastructure: diagnosing outages, + terminating idle resources, scaling services, and optimizing costs + without causing collateral damage to production workloads. + + Supports 3 difficulty-tiered tasks with seeded procedural generation + and chaos event injection for robustness testing. + """ + + MAX_STEPS = 15 + + def __init__(self): + super().__init__() + self._env_state: Dict[str, Any] = {} + self._initial_state: Dict[str, Any] = {} + self._action_history: List[Dict[str, Any]] = [] + self._current_step: int = 0 + self._done: bool = False + self._task_id: str = "" + self._task = None + self._cumulative_reward: float = 0.0 + self._seed: Optional[int] = None + self._chaos_enabled: bool = False + self._sre_state = SREState() + + def reset(self, task_id: str = "phantom_volume_cleanup", + seed: Optional[int] = None) -> SREObservation: + """Reset to a specific task's initial state.""" + task_cls = TASK_REGISTRY.get(task_id) + if task_cls is None: + raise ValueError(f"Unknown task '{task_id}'. Available: {list(TASK_REGISTRY.keys())}") + + self._task = task_cls() + self._task_id = task_id + self._seed = seed + self._initial_state = self._task.get_initial_state(seed=seed) + self._env_state = copy.deepcopy(self._initial_state) + self._action_history = [] + self._current_step = 0 + self._done = False + self._cumulative_reward = 0.0 + self._chaos_enabled = seed is not None + + self._sre_state = SREState( + episode_id=str(uuid.uuid4()), + task_id=task_id, + ) + + return self._build_observation() + + def step(self, action: SREAction) -> SREObservation: + """Execute one agent action. Returns the resulting observation.""" + if self._done: + return self._build_observation() + + self._current_step += 1 + step_reward = 0.0 + + # Dispatch action + cmd = action.command + rid = action.resource_id + params = action.params or {} + + if cmd == ActionCommand.TERMINATE.value: + step_reward, _ = self._handle_terminate(rid) + elif cmd == ActionCommand.SCALE.value: + step_reward, _ = self._handle_scale(rid, params.get("target_size", "")) + elif cmd == ActionCommand.REBOOT.value: + step_reward, _ = self._handle_reboot(rid) + elif cmd == ActionCommand.INSPECT.value: + step_reward, _ = self._handle_inspect(rid) + elif cmd == ActionCommand.WAIT.value: + step_reward = -0.01 + + self._action_history.append({ + "step": self._current_step, + "command": cmd, "resource_id": rid, "params": params, + }) + + # Chaos injection + if self._chaos_enabled and self._seed is not None: + self._maybe_inject_chaos() + + # Recalculate + self._recalculate_state() + + if self._current_step >= self.MAX_STEPS: + self._done = True + + self._cumulative_reward += step_reward + self._sre_state.current_step = self._current_step + self._sre_state.step_count = self._current_step + self._sre_state.done = self._done + self._sre_state.cumulative_reward = round(self._cumulative_reward, 4) + self._sre_state.action_count = len(self._action_history) + + return self._build_observation() + + @property + def state(self) -> SREState: + """Return current episode state.""" + return self._sre_state + + def grade(self) -> Tuple[float, Dict]: + """Run the deterministic grader for the current task.""" + if self._task is None: + return 0.0, {"error": "No task loaded"} + return self._task.grade(self._action_history, self._env_state, self._initial_state) + + # ── Observation builder ── + + def _build_observation(self) -> SREObservation: + return SREObservation( + resources=self._env_state.get("resources", []), + alerts=self._env_state.get("alerts", []), + total_hourly_cost=self._env_state.get("total_hourly_cost", 0.0), + system_uptime=self._env_state.get("system_uptime", 100.0), + step_number=self._current_step, + max_steps=self.MAX_STEPS, + budget_limit=self._env_state.get("budget_limit"), + task_description=self._task.DESCRIPTION if self._task else "", + ) + + # ── Action handlers ── + + def _find_resource(self, resource_id: str) -> Optional[Dict]: + for r in self._env_state.get("resources", []): + if r["id"] == resource_id: + return r + return None + + def _resolve_alerts_for(self, resource_id: str): + self._env_state["alerts"] = [ + a for a in self._env_state.get("alerts", []) + if a.get("resource_id") != resource_id + ] + + def _handle_terminate(self, resource_id: Optional[str]) -> Tuple[float, str]: + if not resource_id: + return -0.05, "Error: No resource_id" + resource = self._find_resource(resource_id) + if resource is None: + return -0.05, f"Error: '{resource_id}' not found" + + tags = resource.get("tags", {}) + cost = resource.get("cost_per_hour", 0) + is_prod = tags.get("env") == "prod" + is_attached_ebs = (resource.get("type") == ResourceType.EBS.value + and resource.get("status") == ResourceStatus.IN_USE.value) + + if is_prod and resource.get("type") == ResourceType.EC2.value: + reward = -0.15 + elif is_attached_ebs: + reward = -0.10 + elif resource.get("status") == ResourceStatus.AVAILABLE.value: + reward = 0.05 + (cost * 0.02) + else: + reward = 0.02 + + self._env_state["resources"] = [r for r in self._env_state["resources"] if r["id"] != resource_id] + self._resolve_alerts_for(resource_id) + return reward, f"Terminated '{resource_id}'" + + def _handle_scale(self, resource_id: Optional[str], target_size: str) -> Tuple[float, str]: + if not resource_id or not target_size: + return -0.05, "Error: Missing resource_id or target_size" + resource = self._find_resource(resource_id) + if resource is None: + return -0.05, f"Error: '{resource_id}' not found" + + old_size = resource.get("instance_size", "unknown") + pre_mutation_cpu = resource.get("cpu_utilization", 0) + + if resource.get("type") == ResourceType.RDS.value: + if target_size not in RDS_PRICING: + return -0.05, f"Error: Invalid RDS size '{target_size}'" + new_cost = RDS_PRICING[target_size] + else: + new_cost = resource.get("cost_per_hour", 0) * 2.0 + + for r in self._env_state["resources"]: + if r["id"] == resource_id: + r["instance_size"] = target_size + r["cost_per_hour"] = new_cost + if r.get("cpu_utilization", 0) > 80: + r["cpu_utilization"] = 45.0 + break + + self._resolve_alerts_for(resource_id) + reward = 0.08 + if pre_mutation_cpu > 80: + reward += 0.05 + return reward, f"Scaled '{resource_id}' from {old_size} to {target_size}" + + def _handle_reboot(self, resource_id: Optional[str]) -> Tuple[float, str]: + if not resource_id: + return -0.05, "Error: No resource_id" + for r in self._env_state.get("resources", []): + if r["id"] == resource_id: + if r["status"] == ResourceStatus.STOPPED.value: + r["status"] = ResourceStatus.RUNNING.value + r["cpu_utilization"] = 15.0 + r["memory_utilization"] = 20.0 + self._env_state["system_uptime"] = min( + 100.0, self._env_state.get("system_uptime", 0) + 30.0) + self._resolve_alerts_for(resource_id) + return 0.10, f"Rebooted '{resource_id}' — now RUNNING" + elif r["status"] == ResourceStatus.RUNNING.value: + r["cpu_utilization"] = 10.0 + return -0.02, f"Rebooted '{resource_id}' — temporary disruption" + else: + return -0.05, f"Cannot reboot '{resource_id}' in state '{r['status']}'" + return -0.05, f"Error: '{resource_id}' not found" + + def _handle_inspect(self, resource_id: Optional[str]) -> Tuple[float, str]: + if not resource_id: + return -0.01, "Error: No resource_id" + resource = self._find_resource(resource_id) + if resource is None: + return -0.01, f"Error: '{resource_id}' not found" + return 0.01, f"Inspected '{resource_id}'" + + def _recalculate_state(self): + self._env_state["total_hourly_cost"] = round( + sum(r.get("cost_per_hour", 0) for r in self._env_state.get("resources", [])), 4) + critical = [a for a in self._env_state.get("alerts", []) if a.get("severity") == "critical"] + if not critical: + self._env_state["system_uptime"] = min(100.0, self._env_state.get("system_uptime", 100) + 10.0) + else: + self._env_state["system_uptime"] = max(0.0, self._env_state.get("system_uptime", 100) - 5.0) + + def _maybe_inject_chaos(self): + rng = random.Random(self._seed * 1000 + self._current_step) + if self._current_step < 4 or rng.random() > 0.25: + return + event = rng.choice(CHAOS_EVENTS) + if event["type"] == "new_alert": + alert = event["alert"].copy() + alert["alert_id"] = f"{alert['alert_id']}-step{self._current_step}" + self._env_state.setdefault("alerts", []).append(alert) + elif event["type"] == "cpu_spike": + running = [r for r in self._env_state.get("resources", []) + if r.get("status") == "running" and r.get("cpu_utilization", 0) < 80] + if running: + target = rng.choice(running) + target["cpu_utilization"] = round(rng.uniform(88, 96), 1) + elif event["type"] == "cost_drift": + drift = round(rng.uniform(0.05, 0.20), 4) + self._env_state["total_hourly_cost"] = round( + self._env_state.get("total_hourly_cost", 0) + drift, 4) diff --git a/envs/cloud_sre_env/server/requirements.txt b/envs/cloud_sre_env/server/requirements.txt new file mode 100644 index 000000000..059215147 --- /dev/null +++ b/envs/cloud_sre_env/server/requirements.txt @@ -0,0 +1,2 @@ +# Cloud SRE Environment — server dependencies +# No additional dependencies beyond openenv-core defaults (fastapi, pydantic, uvicorn) diff --git a/tests/envs/test_cloud_sre_environment.py b/tests/envs/test_cloud_sre_environment.py new file mode 100644 index 000000000..2d11b608b --- /dev/null +++ b/tests/envs/test_cloud_sre_environment.py @@ -0,0 +1,135 @@ +""" +Tests for the Cloud SRE Environment. + +Validates all three tasks (easy, medium, hard) with both fixed +and seeded states, plus grading correctness. +""" + +import pytest +from cloud_sre_env.server.cloud_sre_environment import ( + CloudSREEnvironment, TASK_REGISTRY, +) +from cloud_sre_env.models import SREAction, ActionCommand + + +class TestEnvironmentBasics: + """Test reset, step, state lifecycle.""" + + def test_reset_returns_observation(self): + env = CloudSREEnvironment() + obs = env.reset(task_id="phantom_volume_cleanup") + assert len(obs.resources) > 0 + assert obs.step_number == 0 + assert obs.task_description != "" + + def test_step_increments(self): + env = CloudSREEnvironment() + env.reset(task_id="phantom_volume_cleanup") + obs = env.step(SREAction(command="wait")) + assert obs.step_number == 1 + + def test_state_returns_sre_state(self): + env = CloudSREEnvironment() + env.reset(task_id="phantom_volume_cleanup") + state = env.state + assert state.task_id == "phantom_volume_cleanup" + assert state.episode_id != "" + + def test_max_steps_ends_episode(self): + env = CloudSREEnvironment() + env.reset(task_id="phantom_volume_cleanup") + for _ in range(15): + env.step(SREAction(command="wait")) + assert env.state.done is True + + def test_invalid_task_raises(self): + env = CloudSREEnvironment() + with pytest.raises(ValueError, match="Unknown task"): + env.reset(task_id="nonexistent_task") + + +class TestTask1PhantomVolumeCleanup: + """Test the easy task — orphan volume termination.""" + + def test_fixed_state_has_orphans(self): + env = CloudSREEnvironment() + obs = env.reset(task_id="phantom_volume_cleanup") + orphans = [r for r in obs.resources if r.get("status") == "available"] + assert len(orphans) == 3 + + def test_perfect_score(self): + env = CloudSREEnvironment() + env.reset(task_id="phantom_volume_cleanup") + for oid in ["ebs-orphan-001", "ebs-orphan-002", "ebs-orphan-003"]: + env.step(SREAction(command="terminate", resource_id=oid)) + score, breakdown = env.grade() + assert score == 1.0 + assert len(breakdown["orphans_terminated"]) == 3 + + def test_penalty_for_wrong_termination(self): + env = CloudSREEnvironment() + env.reset(task_id="phantom_volume_cleanup") + env.step(SREAction(command="terminate", resource_id="ec2-web-001")) + score, breakdown = env.grade() + assert score == 0.0 + assert len(breakdown["active_resources_terminated"]) == 1 + + def test_seeded_state_is_different(self): + env = CloudSREEnvironment() + obs1 = env.reset(task_id="phantom_volume_cleanup", seed=42) + obs2 = env.reset(task_id="phantom_volume_cleanup", seed=99) + ids1 = {r["id"] for r in obs1.resources} + ids2 = {r["id"] for r in obs2.resources} + assert ids1 != ids2 + + +class TestTask2LatencySpikeRemediation: + """Test the medium task — RDS scaling.""" + + def test_fixed_state_has_rds(self): + env = CloudSREEnvironment() + obs = env.reset(task_id="latency_spike_remediation") + rds = [r for r in obs.resources if r.get("type") == "rds_database"] + assert len(rds) >= 1 + + def test_scaling_rds_scores(self): + env = CloudSREEnvironment() + env.reset(task_id="latency_spike_remediation") + env.step(SREAction(command="scale", resource_id="rds-primary-001", + params={"target_size": "db.t3.medium"})) + score, breakdown = env.grade() + assert score >= 0.7 + assert breakdown["rds_scaled"] is True + + +class TestTask3NoisyNeighborIncident: + """Test the hard task — rogue instance investigation.""" + + def test_fixed_state_has_rogue_and_stopped_backend(self): + env = CloudSREEnvironment() + obs = env.reset(task_id="noisy_neighbor_incident") + ids = {r["id"] for r in obs.resources} + assert "ec2-rogue-test-001" in ids + assert "ec2-backend-prod-001" in ids + + def test_perfect_incident_response(self): + env = CloudSREEnvironment() + env.reset(task_id="noisy_neighbor_incident") + env.step(SREAction(command="inspect", resource_id="ec2-rogue-test-001")) + env.step(SREAction(command="terminate", resource_id="ec2-rogue-test-001")) + env.step(SREAction(command="reboot", resource_id="ec2-backend-prod-001")) + score, breakdown = env.grade() + assert score == 1.0 + assert breakdown["inspected_rogue"] is True + assert breakdown["terminated_rogue"] is True + assert breakdown["rebooted_backend"] is True + assert breakdown["alerts_resolved"] is True + + +class TestAllTasksRegistered: + """Ensure all tasks are in the registry.""" + + def test_registry_has_all_tasks(self): + assert "phantom_volume_cleanup" in TASK_REGISTRY + assert "latency_spike_remediation" in TASK_REGISTRY + assert "noisy_neighbor_incident" in TASK_REGISTRY