Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
*.swp
*.bak
real-values.yaml

.cursor

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
64 changes: 64 additions & 0 deletions AGENT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Agent Constraints

**Read this file before making any changes.** These constraints apply to all AI-assisted work in this repository. This file is the permanent **constraints** layer; task-specific **Goal / Constraints / Output Format / Failure Conditions** belong in copies of the templates under `[docs/specs/templates/](docs/specs/templates/)`.

## Project Identity

This repository is the **chutes.ai** platform **API and validator** code: HTTP API, related services, Docker/deployment assets, and integration with the broader Chutes ecosystem (see [README.md](README.md)).

## Stack (Non-Negotiable)

- **Language**: Python `>=3.10,<3.13` (3.12 is typical for local work)
- **Package manager / runs**: **[uv](https://github.com/astral-sh/uv)** — install with `uv sync`; run tools with `uv run …` (e.g. `uv run pytest`)
- **Build**: **hatchling** ([pyproject.toml](pyproject.toml)); installable packages: `**api`**, `**metasync**`
- **HTTP API**: **FastAPI** + **Uvicorn**, default **ORJSONResponse** ([api/main.py](api/main.py))
- **Data**: **SQLAlchemy 2.x** + **asyncpg**, **Pydantic** / **pydantic-settings**; SQL migrations under `[api/migrations/](api/migrations/)` (timestamped `.sql`)
- **Ops / deps** (non-exhaustive): **Redis**, **loguru**, **httpx** / **aiohttp**, **Socket.IO** client, **aioboto3**, domain packages (`chutes`, Bittensor-related libs, attestation tooling as pulled in by pyproject)

Do not replace this stack with alternate frameworks or ORMs unless explicitly agreed. Do not introduce extra dependencies without discussion.

## Hard Rules

- **Never add a new dependency** without explicit approval
- **Configuration**: use `**api.config.settings`** (pydantic-settings) and environment variables — **no hardcoded secrets**, connection strings, or API keys
- **Database schema changes**: add a new file under `[api/migrations/](api/migrations/)` **and** keep ORM models in `[api/database/orms.py](api/database/orms.py)` in sync; describe the migration plan in PRs/specs
- **Lint/format**: **Ruff** only — `make lint` and `make reformat` ([makefiles/lint.mk](makefiles/lint.mk)); there is **no enforced coverage percentage** in CI — still add or update tests when behavior changes
- `**nv-attest/`**: excluded from Ruff in pyproject — do not “fix” it via repo-wide lint refactors unless scoped to that subtree
- **Crypto / attestation-sensitive code**: follow existing patterns in the relevant modules (e.g. server/attestation paths); never hardcode keys or measurements

## Patterns

- **Async-first** in request paths: `**async def`** handlers, **async** SQLAlchemy sessions (`**get_session`** and related helpers in `[api/database/](api/database/)`); avoid blocking I/O in handlers
- **Domain layout**: routes in `**api/<domain>/router.py`**, shared logic often in `**api/<domain>/util.py**` (match neighboring domains)
- **Models and settings**: ORM in `**api/database/orms.py`**; app settings via `**api.config**`
- **Tests**: under `**tests/unit/`** and `**tests/integration/**`; use `**uv run pytest**`. **Match test style** to the file you edit (this repo uses both plain `**def test_*`** and `**class Test***` — stay consistent with surrounding tests)
- **Naming and structure**: follow existing modules in the same package; prefer small, focused changes over large unsolicited refactors

## Architecture Overview


| Area | Purpose |
| ---------------------------------------------------- | ------------------------------------------------------------------------------------------------------------- |
| **[api/main.py](api/main.py)** | Main FastAPI app: lifespan, mounts domain routers (users, chutes, instances, invocations, payments, miner, …) |
| `**api/<domain>/`** | Feature modules: `router.py`, `util.py`, and related schemas/helpers per domain |
| **[api/database/](api/database/)** | Async engine/session helpers; **[api/database/orms.py](api/database/orms.py)** ORM models |
| **[api/migrations/](api/migrations/)** | Ordered SQL migrations consumed at startup (see `lifespan` / `tasks.py`) |
| **[metasync/](metasync/)** | Metagraph / sync utilities (separate package in the same repo) |
| `**api/payment/`**, `**api/socket_server.py**`, etc. | Auxiliary services or entrypoints alongside the main app — follow local patterns when touching them |
| **[nv-attest/](nv-attest/)** | Attestation-related subtree (own tooling; Ruff-excluded at repo root) |
| `**docker/`**, **[dev/dev.md](dev/dev.md)** | Local Docker Compose and dev bootstrap |


## Development Commands

```bash
uv sync --extra dev # Install project + dev dependencies (from repo root)
uv run pytest # Run tests (add paths or -k as needed)
make lint # Ruff check + format check
make reformat # Ruff format (line length per makefile)
make infrastructure # Docker compose up for test infra (see makefiles/development.mk)
```

Local full stack: see **[dev/dev.md](dev/dev.md)** (Docker network, `docker compose`, optional GPU compose files).

**Session handshake**: Before starting substantive work, confirm you have read this file and will follow it; for non-trivial features, bugfixes, or refactors, consider filling a copy of the appropriate template under `[docs/specs/templates/](docs/specs/templates/)`.
49 changes: 47 additions & 2 deletions api/instance/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from api.config import settings
from api.job.schemas import Job
from api.database import get_session
from api.util import has_legacy_private_billing, notify_deleted, semcomp
from api.util import has_legacy_private_billing, notify_deleted, notify_job_deleted, semcomp
from api.user.service import chutes_user_id
from api.bounty.util import create_bounty_if_not_exists, get_bounty_amount, send_bounty_notification
from sqlalchemy.future import select
Expand All @@ -43,7 +43,7 @@
from api.server.client import TeeServerClient
from api.server.schemas import Server
from api.server.exceptions import GetEvidenceError
from api.server.service import verify_quote, verify_gpu_evidence
from api.server.util import verify_quote, verify_gpu_evidence
from api.server.util import get_public_key_hash

# Define an alias for the Instance model to use in a subquery
Expand Down Expand Up @@ -1191,3 +1191,48 @@ async def is_instance_in_thrash_penalty(
instance_created_at = instance_created_at.replace(tzinfo=None)

return await is_thrashing_miner(db, miner_hotkey, chute_id, instance_created_at)


async def purge(target, reason, valid_termination=False):
"""Delete an instance from the database and clean up associated state."""
async with get_session() as session:
await session.execute(
text("DELETE FROM instances WHERE instance_id = :instance_id"),
{"instance_id": target.instance_id},
)
await session.execute(
text(
"UPDATE instance_audit SET deletion_reason = :reason, valid_termination = :valid_termination WHERE instance_id = :instance_id"
),
{
"instance_id": target.instance_id,
"reason": reason,
"valid_termination": valid_termination,
},
)

job = (
(await session.execute(select(Job).where(Job.instance_id == target.instance_id)))
.unique()
.scalar_one_or_none()
)
if job and not job.finished_at:
job.status = "error"
job.error_detail = f"Instance failed monitoring probes: {reason=}"
job.miner_terminated = True
job.finished_at = func.now()
await notify_job_deleted(job)

await session.commit()

await cleanup_instance_conn_tracking(target.chute_id, target.instance_id)


async def purge_and_notify(target, reason, valid_termination=False):
"""Purge an instance and broadcast a deletion notification."""
await purge(target, reason=reason, valid_termination=valid_termination)
await notify_deleted(
target,
message=f"Instance {target.instance_id} of miner {target.miner_hotkey} deleted: {reason}",
)
await invalidate_instance_cache(target.chute_id, instance_id=target.instance_id)
49 changes: 49 additions & 0 deletions api/migrations/20260403120000_server_maintenance.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
-- migrate:up

CREATE TABLE IF NOT EXISTS tee_upgrade_windows (
id VARCHAR PRIMARY KEY,
upgrade_window_start TIMESTAMPTZ NOT NULL,
upgrade_window_end TIMESTAMPTZ NOT NULL,
target_measurement_version TEXT NOT NULL,
max_concurrent_per_miner INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT uq_tee_upgrade_target UNIQUE (target_measurement_version),
CONSTRAINT chk_window_bounds CHECK (upgrade_window_end > upgrade_window_start)
);

CREATE INDEX IF NOT EXISTS idx_tee_upgrade_window_bounds
ON tee_upgrade_windows (upgrade_window_start, upgrade_window_end);

ALTER TABLE servers
ADD COLUMN IF NOT EXISTS maintenance_pending_window_id VARCHAR
REFERENCES tee_upgrade_windows(id) ON DELETE SET NULL;

ALTER TABLE servers
ADD COLUMN IF NOT EXISTS version TEXT;

CREATE INDEX IF NOT EXISTS idx_servers_maintenance_pending
ON servers (miner_hotkey)
WHERE maintenance_pending_window_id IS NOT NULL;

ALTER TABLE boot_attestations
ADD COLUMN IF NOT EXISTS miner_hotkey VARCHAR;

ALTER TABLE boot_attestations
ADD COLUMN IF NOT EXISTS vm_name VARCHAR;

CREATE INDEX IF NOT EXISTS idx_boot_miner_vm
ON boot_attestations (miner_hotkey, vm_name);

-- migrate:down

DROP INDEX IF EXISTS idx_boot_miner_vm;

ALTER TABLE boot_attestations DROP COLUMN IF EXISTS vm_name;
ALTER TABLE boot_attestations DROP COLUMN IF EXISTS miner_hotkey;

DROP INDEX IF EXISTS idx_servers_maintenance_pending;

ALTER TABLE servers DROP COLUMN IF EXISTS version;
ALTER TABLE servers DROP COLUMN IF EXISTS maintenance_pending_window_id;

DROP TABLE IF EXISTS tee_upgrade_windows;
4 changes: 4 additions & 0 deletions api/miner/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class MinerServer(BaseModel):
name: str
ip: str
is_tee: bool
version: str | None = None
maintenance_pending: bool = False
created_at: str | None = None
updated_at: str | None = None
gpus: list[MinerServerGpu] = Field(default_factory=list)
Expand All @@ -38,6 +40,8 @@ def from_server(cls, server: Server) -> "MinerServer":
name=server.name,
ip=server.ip,
is_tee=server.is_tee,
version=server.version,
maintenance_pending=server.maintenance_pending_window_id is not None,
created_at=server.created_at.isoformat() if server.created_at else None,
updated_at=server.updated_at.isoformat() if server.updated_at else None,
gpus=[
Expand Down
97 changes: 96 additions & 1 deletion api/server/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
BootAttestationResponse,
RuntimeAttestationResponse,
LuksPassphraseRequest,
PreflightResult,
ConfirmMaintenanceResult,
MaintenancePolicyResponse,
PendingServerInfo,
TeeUpgradeWindow,
UpgradeWindowInfo,
)
from api.server.service import (
create_nonce,
Expand All @@ -38,6 +44,10 @@
delete_server,
validate_request_nonce,
process_luks_passphrase_request,
get_active_upgrade_window,
preflight_maintenance,
confirm_maintenance,
_count_active_maintenance_slots,
)
from api.server.util import (
decrypt_passphrase,
Expand Down Expand Up @@ -332,6 +342,55 @@ async def create_server(
)


@router.get("/maintenance/policy", response_model=MaintenancePolicyResponse)
async def get_maintenance_policy(
db: AsyncSession = Depends(get_db_session),
hotkey: str | None = Header(None, alias=HOTKEY_HEADER),
_: User = Depends(
get_current_user(purpose="tee", raise_not_found=False, registered_to=settings.netuid)
),
):
"""Return the active upgrade window, concurrency limits, and the miner's pending servers."""
if not hotkey:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Hotkey header required")

active_window = await get_active_upgrade_window(db)
window_info: UpgradeWindowInfo | None = None
current_slots = 0
pending_servers: list[PendingServerInfo] = []

if active_window is not None:
window_info = UpgradeWindowInfo(
id=active_window.id,
target_measurement_version=active_window.target_measurement_version,
upgrade_window_start=str(active_window.upgrade_window_start),
upgrade_window_end=str(active_window.upgrade_window_end),
max_concurrent_per_miner=active_window.max_concurrent_per_miner,
)
current_slots = await _count_active_maintenance_slots(db, hotkey, active_window)

query = select(Server).where(
Server.miner_hotkey == hotkey,
Server.maintenance_pending_window_id.isnot(None),
)
result = await db.execute(query)
for srv in result.scalars().all():
pending_servers.append(
PendingServerInfo(
server_id=srv.server_id,
name=srv.name,
version=srv.version,
target_version=active_window.target_measurement_version,
)
)

return MaintenancePolicyResponse(
active_window=window_info,
current_slots=current_slots,
pending_servers=pending_servers,
)


@router.patch("/{server_id}", response_model=Dict[str, Any])
async def patch_server_name(
server_id: str,
Expand Down Expand Up @@ -386,13 +445,21 @@ async def get_server_details(
try:
server = await check_server_ownership(db, server_id, hotkey)

return {
response: dict = {
"server_id": server.server_id,
"name": server.name,
"ip": server.ip,
"version": server.version,
"maintenance_pending_window_id": server.maintenance_pending_window_id,
"created_at": server.created_at.isoformat(),
"updated_at": server.updated_at.isoformat() if server.updated_at else None,
}
if server.maintenance_pending_window_id is not None:
window = await db.get(TeeUpgradeWindow, server.maintenance_pending_window_id)
if window is not None:
response["target_version"] = window.target_measurement_version

return response

except ServerNotFoundError as e:
raise e
Expand All @@ -405,6 +472,34 @@ async def get_server_details(
)


@router.get("/{server_name_or_id}/maintenance/preflight", response_model=PreflightResult)
async def get_maintenance_preflight(
server_name_or_id: str,
db: AsyncSession = Depends(get_db_session),
hotkey: str | None = Header(None, alias=HOTKEY_HEADER),
_: User = Depends(
get_current_user(purpose="tee", raise_not_found=False, registered_to=settings.netuid)
),
):
"""Check maintenance eligibility for a server without entering maintenance."""
server = await get_server_by_name_or_id(db, hotkey, server_name_or_id)
return await preflight_maintenance(db, server, hotkey)


@router.put("/{server_name_or_id}/maintenance", response_model=ConfirmMaintenanceResult)
async def put_confirm_maintenance(
server_name_or_id: str,
db: AsyncSession = Depends(get_db_session),
hotkey: str | None = Header(None, alias=HOTKEY_HEADER),
_: User = Depends(
get_current_user(purpose="tee", raise_not_found=False, registered_to=settings.netuid)
),
):
"""Enter maintenance: purge instances and mark server for upgrade."""
server = await get_server_by_name_or_id(db, hotkey, server_name_or_id)
return await confirm_maintenance(db, server, hotkey)


@router.delete("/{server_name_or_id}", response_model=Dict[str, str])
async def remove_server(
server_name_or_id: str,
Expand Down
Loading
Loading