Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dev = [
"ruff>=0.5.0",
"pytest>=7.0.0",
"pytest-asyncio",
"respx>=0.21.0",
"mypy",
"build",
"twine",
Expand Down
12 changes: 12 additions & 0 deletions src/vercel/_core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Core SDK infrastructure with shared code between sync and async clients."""

from __future__ import annotations

from .client import AsyncVercelClient, VercelClient
from .config import ClientConfig

__all__ = [
"VercelClient",
"AsyncVercelClient",
"ClientConfig",
]
77 changes: 77 additions & 0 deletions src/vercel/_core/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Vercel API clients with namespaced sub-clients."""

from __future__ import annotations

from typing import Any

from .config import DEFAULT_API_BASE_URL, DEFAULT_TIMEOUT, ClientConfig
from .deployments import AsyncDeploymentsClient, DeploymentsClient
from .iter_coroutine import iter_coroutine
from .projects import AsyncProjectsClient, ProjectsClient
from .transport import AsyncTransport, BlockingTransport


class VercelClient:
"""Synchronous Vercel SDK client."""

def __init__(
self,
*,
access_token: str | None = None,
base_url: str | None = None,
timeout: float | None = None,
default_team_id: str | None = None,
default_slug: str | None = None,
):
self._config = ClientConfig(
access_token=access_token,
base_url=base_url or DEFAULT_API_BASE_URL,
timeout=timeout or DEFAULT_TIMEOUT,
default_team_id=default_team_id,
default_slug=default_slug,
)
self._transport = BlockingTransport(self._config)
self.projects = ProjectsClient(self._transport, self._config)
self.deployments = DeploymentsClient(self._transport, self._config)

def close(self) -> None:
iter_coroutine(self._transport.close())

def __enter__(self) -> VercelClient:
return self

def __exit__(self, *args: Any) -> None:
self.close()


class AsyncVercelClient:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class AsyncVercelClient:
class AsyncIOVercelClient:

and we also need to add a TrioVercelClient

"""Asynchronous Vercel SDK client."""

def __init__(
self,
*,
access_token: str | None = None,
base_url: str | None = None,
timeout: float | None = None,
default_team_id: str | None = None,
default_slug: str | None = None,
):
self._config = ClientConfig(
access_token=access_token,
base_url=base_url or DEFAULT_API_BASE_URL,
timeout=timeout or DEFAULT_TIMEOUT,
default_team_id=default_team_id,
default_slug=default_slug,
)
self._transport = AsyncTransport(self._config)
self.projects = AsyncProjectsClient(self._transport, self._config)
self.deployments = AsyncDeploymentsClient(self._transport, self._config)

async def close(self) -> None:
await self._transport.close()

async def __aenter__(self) -> AsyncVercelClient:
return self

async def __aexit__(self, *args: Any) -> None:
await self.close()
41 changes: 41 additions & 0 deletions src/vercel/_core/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Client configuration."""

from __future__ import annotations

import os
from dataclasses import dataclass, field

DEFAULT_API_BASE_URL = "https://api.vercel.com"
DEFAULT_TIMEOUT = 60.0


@dataclass
class ClientConfig:
"""SDK configuration."""

access_token: str | None = None
base_url: str = DEFAULT_API_BASE_URL
timeout: float = DEFAULT_TIMEOUT
default_team_id: str | None = None
default_slug: str | None = None
headers: dict[str, str] = field(default_factory=dict)

def resolve_token(self) -> str:
env_token = os.getenv("VERCEL_TOKEN")
resolved = self.access_token or env_token
if not resolved:
raise RuntimeError(
"Missing Vercel API token. Pass access_token=... or set VERCEL_TOKEN."
)
return resolved

def get_auth_headers(self) -> dict[str, str]:
return {
"authorization": f"Bearer {self.resolve_token()}",
"accept": "application/json",
"content-type": "application/json",
**self.headers,
}

def build_url(self, path: str) -> str:
return self.base_url.rstrip("/") + path
202 changes: 202 additions & 0 deletions src/vercel/_core/deployments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
"""Deployments API client."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

import httpx

from .._telemetry.tracker import track
from .iter_coroutine import iter_coroutine

if TYPE_CHECKING:
from .config import ClientConfig
from .transport import BaseTransport


class BaseDeploymentsClient:
"""Base deployments client with shared async business logic."""

def __init__(self, transport: BaseTransport, config: ClientConfig):
self._transport = transport
self._config = config

def _build_params(
self,
team_id: str | None = None,
slug: str | None = None,
force_new: bool | None = None,
skip_auto_detection_confirmation: bool | None = None,
) -> dict[str, Any]:
params: dict[str, Any] = {}
resolved_team_id = team_id or self._config.default_team_id
resolved_slug = slug or self._config.default_slug
if resolved_team_id:
params["teamId"] = resolved_team_id
if resolved_slug:
params["slug"] = resolved_slug
if force_new is not None:
params["forceNew"] = "1" if force_new else "0"
if skip_auto_detection_confirmation is not None:
params["skipAutoDetectionConfirmation"] = (
"1" if skip_auto_detection_confirmation else "0"
)
return params

def _handle_error(self, response: httpx.Response, operation: str) -> None:
try:
data = response.json()
except Exception:
data = {"error": response.text}
raise RuntimeError(
f"Failed to {operation}: {response.status_code} "
f"{response.reason_phrase} - {data}"
)

async def _create(
self,
*,
body: dict[str, Any],
team_id: str | None = None,
slug: str | None = None,
force_new: bool | None = None,
skip_auto_detection_confirmation: bool | None = None,
) -> dict[str, Any]:
if not isinstance(body, dict):
raise ValueError("body must be a dict")

params = self._build_params(
team_id, slug, force_new, skip_auto_detection_confirmation
)

response = await self._transport.send(
"POST", "/v13/deployments", params=params, json=body
)

if not (200 <= response.status_code < 300):
self._handle_error(response, "create deployment")

track(
"deployment_create",
token=self._config.access_token,
target=body.get("target"),
force_new=bool(force_new) if force_new is not None else None,
)
return response.json()

async def _upload_file(
self,
*,
content: bytes | bytearray | memoryview,
content_length: int,
x_vercel_digest: str | None = None,
x_now_digest: str | None = None,
x_now_size: int | None = None,
team_id: str | None = None,
slug: str | None = None,
) -> dict[str, Any]:
params = self._build_params(team_id, slug)

headers: dict[str, str] = {
"content-type": "application/octet-stream",
"Content-Length": str(content_length),
}
if x_vercel_digest:
headers["x-vercel-digest"] = x_vercel_digest
if x_now_digest:
headers["x-now-digest"] = x_now_digest
if x_now_size is not None:
headers["x-now-size"] = str(x_now_size)

response = await self._transport.send(
"POST", "/v2/files", params=params, content=bytes(content), headers=headers
)

if not (200 <= response.status_code < 300):
self._handle_error(response, "upload file")

return response.json()


class DeploymentsClient(BaseDeploymentsClient):
def create(
self,
*,
body: dict[str, Any],
team_id: str | None = None,
slug: str | None = None,
force_new: bool | None = None,
skip_auto_detection_confirmation: bool | None = None,
) -> dict[str, Any]:
return iter_coroutine(
self._create(
body=body,
team_id=team_id,
slug=slug,
force_new=force_new,
skip_auto_detection_confirmation=skip_auto_detection_confirmation,
)
)

def upload_file(
self,
*,
content: bytes | bytearray | memoryview,
content_length: int,
x_vercel_digest: str | None = None,
x_now_digest: str | None = None,
x_now_size: int | None = None,
team_id: str | None = None,
slug: str | None = None,
) -> dict[str, Any]:
return iter_coroutine(
self._upload_file(
content=content,
content_length=content_length,
x_vercel_digest=x_vercel_digest,
x_now_digest=x_now_digest,
x_now_size=x_now_size,
team_id=team_id,
slug=slug,
)
)


class AsyncDeploymentsClient(BaseDeploymentsClient):
async def create(
self,
*,
body: dict[str, Any],
team_id: str | None = None,
slug: str | None = None,
force_new: bool | None = None,
skip_auto_detection_confirmation: bool | None = None,
) -> dict[str, Any]:
return await self._create(
body=body,
team_id=team_id,
slug=slug,
force_new=force_new,
skip_auto_detection_confirmation=skip_auto_detection_confirmation,
)

async def upload_file(
self,
*,
content: bytes | bytearray | memoryview,
content_length: int,
x_vercel_digest: str | None = None,
x_now_digest: str | None = None,
x_now_size: int | None = None,
team_id: str | None = None,
slug: str | None = None,
) -> dict[str, Any]:
return await self._upload_file(
content=content,
content_length=content_length,
x_vercel_digest=x_vercel_digest,
x_now_digest=x_now_digest,
x_now_size=x_now_size,
team_id=team_id,
slug=slug,
)
19 changes: 19 additions & 0 deletions src/vercel/_core/iter_coroutine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Run simple coroutines synchronously."""

from __future__ import annotations

import typing

_T = typing.TypeVar("_T")


def iter_coroutine(coro: typing.Coroutine[None, None, _T]) -> _T:
"""Execute a coroutine that completes without suspending."""
try:
coro.send(None)
except StopIteration as ex:
return ex.value # type: ignore [no-any-return]
else:
raise RuntimeError(f"coroutine {coro!r} did not stop after one iteration!")
finally:
coro.close()
Loading
Loading