diff --git a/pyproject.toml b/pyproject.toml index 685e95a..c7ec6e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dev = [ "ruff>=0.5.0", "pytest>=7.0.0", "pytest-asyncio", + "respx>=0.21.0", "mypy", "build", "twine", diff --git a/src/vercel/_core/__init__.py b/src/vercel/_core/__init__.py new file mode 100644 index 0000000..5384421 --- /dev/null +++ b/src/vercel/_core/__init__.py @@ -0,0 +1,12 @@ +"""Core SDK infrastructure with shared code between sync and async clients.""" + +from __future__ import annotations + +from .client import AsyncVercelClient, VercelClient +from .config import ClientConfig + +__all__ = [ + "VercelClient", + "AsyncVercelClient", + "ClientConfig", +] diff --git a/src/vercel/_core/client.py b/src/vercel/_core/client.py new file mode 100644 index 0000000..9746623 --- /dev/null +++ b/src/vercel/_core/client.py @@ -0,0 +1,77 @@ +"""Vercel API clients with namespaced sub-clients.""" + +from __future__ import annotations + +from typing import Any + +from .config import DEFAULT_API_BASE_URL, DEFAULT_TIMEOUT, ClientConfig +from .deployments import AsyncDeploymentsClient, DeploymentsClient +from .iter_coroutine import iter_coroutine +from .projects import AsyncProjectsClient, ProjectsClient +from .transport import AsyncTransport, BlockingTransport + + +class VercelClient: + """Synchronous Vercel SDK client.""" + + def __init__( + self, + *, + access_token: str | None = None, + base_url: str | None = None, + timeout: float | None = None, + default_team_id: str | None = None, + default_slug: str | None = None, + ): + self._config = ClientConfig( + access_token=access_token, + base_url=base_url or DEFAULT_API_BASE_URL, + timeout=timeout or DEFAULT_TIMEOUT, + default_team_id=default_team_id, + default_slug=default_slug, + ) + self._transport = BlockingTransport(self._config) + self.projects = ProjectsClient(self._transport, self._config) + self.deployments = DeploymentsClient(self._transport, self._config) + + def close(self) -> None: + iter_coroutine(self._transport.close()) + + def __enter__(self) -> VercelClient: + return self + + def __exit__(self, *args: Any) -> None: + self.close() + + +class AsyncVercelClient: + """Asynchronous Vercel SDK client.""" + + def __init__( + self, + *, + access_token: str | None = None, + base_url: str | None = None, + timeout: float | None = None, + default_team_id: str | None = None, + default_slug: str | None = None, + ): + self._config = ClientConfig( + access_token=access_token, + base_url=base_url or DEFAULT_API_BASE_URL, + timeout=timeout or DEFAULT_TIMEOUT, + default_team_id=default_team_id, + default_slug=default_slug, + ) + self._transport = AsyncTransport(self._config) + self.projects = AsyncProjectsClient(self._transport, self._config) + self.deployments = AsyncDeploymentsClient(self._transport, self._config) + + async def close(self) -> None: + await self._transport.close() + + async def __aenter__(self) -> AsyncVercelClient: + return self + + async def __aexit__(self, *args: Any) -> None: + await self.close() diff --git a/src/vercel/_core/config.py b/src/vercel/_core/config.py new file mode 100644 index 0000000..f5af45c --- /dev/null +++ b/src/vercel/_core/config.py @@ -0,0 +1,41 @@ +"""Client configuration.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass, field + +DEFAULT_API_BASE_URL = "https://api.vercel.com" +DEFAULT_TIMEOUT = 60.0 + + +@dataclass +class ClientConfig: + """SDK configuration.""" + + access_token: str | None = None + base_url: str = DEFAULT_API_BASE_URL + timeout: float = DEFAULT_TIMEOUT + default_team_id: str | None = None + default_slug: str | None = None + headers: dict[str, str] = field(default_factory=dict) + + def resolve_token(self) -> str: + env_token = os.getenv("VERCEL_TOKEN") + resolved = self.access_token or env_token + if not resolved: + raise RuntimeError( + "Missing Vercel API token. Pass access_token=... or set VERCEL_TOKEN." + ) + return resolved + + def get_auth_headers(self) -> dict[str, str]: + return { + "authorization": f"Bearer {self.resolve_token()}", + "accept": "application/json", + "content-type": "application/json", + **self.headers, + } + + def build_url(self, path: str) -> str: + return self.base_url.rstrip("/") + path diff --git a/src/vercel/_core/deployments.py b/src/vercel/_core/deployments.py new file mode 100644 index 0000000..71803d8 --- /dev/null +++ b/src/vercel/_core/deployments.py @@ -0,0 +1,202 @@ +"""Deployments API client.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import httpx + +from .._telemetry.tracker import track +from .iter_coroutine import iter_coroutine + +if TYPE_CHECKING: + from .config import ClientConfig + from .transport import BaseTransport + + +class BaseDeploymentsClient: + """Base deployments client with shared async business logic.""" + + def __init__(self, transport: BaseTransport, config: ClientConfig): + self._transport = transport + self._config = config + + def _build_params( + self, + team_id: str | None = None, + slug: str | None = None, + force_new: bool | None = None, + skip_auto_detection_confirmation: bool | None = None, + ) -> dict[str, Any]: + params: dict[str, Any] = {} + resolved_team_id = team_id or self._config.default_team_id + resolved_slug = slug or self._config.default_slug + if resolved_team_id: + params["teamId"] = resolved_team_id + if resolved_slug: + params["slug"] = resolved_slug + if force_new is not None: + params["forceNew"] = "1" if force_new else "0" + if skip_auto_detection_confirmation is not None: + params["skipAutoDetectionConfirmation"] = ( + "1" if skip_auto_detection_confirmation else "0" + ) + return params + + def _handle_error(self, response: httpx.Response, operation: str) -> None: + try: + data = response.json() + except Exception: + data = {"error": response.text} + raise RuntimeError( + f"Failed to {operation}: {response.status_code} " + f"{response.reason_phrase} - {data}" + ) + + async def _create( + self, + *, + body: dict[str, Any], + team_id: str | None = None, + slug: str | None = None, + force_new: bool | None = None, + skip_auto_detection_confirmation: bool | None = None, + ) -> dict[str, Any]: + if not isinstance(body, dict): + raise ValueError("body must be a dict") + + params = self._build_params( + team_id, slug, force_new, skip_auto_detection_confirmation + ) + + response = await self._transport.send( + "POST", "/v13/deployments", params=params, json=body + ) + + if not (200 <= response.status_code < 300): + self._handle_error(response, "create deployment") + + track( + "deployment_create", + token=self._config.access_token, + target=body.get("target"), + force_new=bool(force_new) if force_new is not None else None, + ) + return response.json() + + async def _upload_file( + self, + *, + content: bytes | bytearray | memoryview, + content_length: int, + x_vercel_digest: str | None = None, + x_now_digest: str | None = None, + x_now_size: int | None = None, + team_id: str | None = None, + slug: str | None = None, + ) -> dict[str, Any]: + params = self._build_params(team_id, slug) + + headers: dict[str, str] = { + "content-type": "application/octet-stream", + "Content-Length": str(content_length), + } + if x_vercel_digest: + headers["x-vercel-digest"] = x_vercel_digest + if x_now_digest: + headers["x-now-digest"] = x_now_digest + if x_now_size is not None: + headers["x-now-size"] = str(x_now_size) + + response = await self._transport.send( + "POST", "/v2/files", params=params, content=bytes(content), headers=headers + ) + + if not (200 <= response.status_code < 300): + self._handle_error(response, "upload file") + + return response.json() + + +class DeploymentsClient(BaseDeploymentsClient): + def create( + self, + *, + body: dict[str, Any], + team_id: str | None = None, + slug: str | None = None, + force_new: bool | None = None, + skip_auto_detection_confirmation: bool | None = None, + ) -> dict[str, Any]: + return iter_coroutine( + self._create( + body=body, + team_id=team_id, + slug=slug, + force_new=force_new, + skip_auto_detection_confirmation=skip_auto_detection_confirmation, + ) + ) + + def upload_file( + self, + *, + content: bytes | bytearray | memoryview, + content_length: int, + x_vercel_digest: str | None = None, + x_now_digest: str | None = None, + x_now_size: int | None = None, + team_id: str | None = None, + slug: str | None = None, + ) -> dict[str, Any]: + return iter_coroutine( + self._upload_file( + content=content, + content_length=content_length, + x_vercel_digest=x_vercel_digest, + x_now_digest=x_now_digest, + x_now_size=x_now_size, + team_id=team_id, + slug=slug, + ) + ) + + +class AsyncDeploymentsClient(BaseDeploymentsClient): + async def create( + self, + *, + body: dict[str, Any], + team_id: str | None = None, + slug: str | None = None, + force_new: bool | None = None, + skip_auto_detection_confirmation: bool | None = None, + ) -> dict[str, Any]: + return await self._create( + body=body, + team_id=team_id, + slug=slug, + force_new=force_new, + skip_auto_detection_confirmation=skip_auto_detection_confirmation, + ) + + async def upload_file( + self, + *, + content: bytes | bytearray | memoryview, + content_length: int, + x_vercel_digest: str | None = None, + x_now_digest: str | None = None, + x_now_size: int | None = None, + team_id: str | None = None, + slug: str | None = None, + ) -> dict[str, Any]: + return await self._upload_file( + content=content, + content_length=content_length, + x_vercel_digest=x_vercel_digest, + x_now_digest=x_now_digest, + x_now_size=x_now_size, + team_id=team_id, + slug=slug, + ) diff --git a/src/vercel/_core/iter_coroutine.py b/src/vercel/_core/iter_coroutine.py new file mode 100644 index 0000000..c52c56b --- /dev/null +++ b/src/vercel/_core/iter_coroutine.py @@ -0,0 +1,19 @@ +"""Run simple coroutines synchronously.""" + +from __future__ import annotations + +import typing + +_T = typing.TypeVar("_T") + + +def iter_coroutine(coro: typing.Coroutine[None, None, _T]) -> _T: + """Execute a coroutine that completes without suspending.""" + try: + coro.send(None) + except StopIteration as ex: + return ex.value # type: ignore [no-any-return] + else: + raise RuntimeError(f"coroutine {coro!r} did not stop after one iteration!") + finally: + coro.close() diff --git a/src/vercel/_core/projects.py b/src/vercel/_core/projects.py new file mode 100644 index 0000000..2604649 --- /dev/null +++ b/src/vercel/_core/projects.py @@ -0,0 +1,199 @@ +"""Projects API client.""" + +from __future__ import annotations + +import urllib.parse +from typing import TYPE_CHECKING, Any + +import httpx + +from .._telemetry.tracker import track +from .iter_coroutine import iter_coroutine + +if TYPE_CHECKING: + from .config import ClientConfig + from .transport import BaseTransport + + +class BaseProjectsClient: + """Base projects client with shared async business logic.""" + + def __init__(self, transport: BaseTransport, config: ClientConfig): + self._transport = transport + self._config = config + + def _build_params( + self, + team_id: str | None = None, + slug: str | None = None, + ) -> dict[str, Any]: + params: dict[str, Any] = {} + resolved_team_id = team_id or self._config.default_team_id + resolved_slug = slug or self._config.default_slug + if resolved_team_id: + params["teamId"] = resolved_team_id + if resolved_slug: + params["slug"] = resolved_slug + return params + + def _handle_error(self, response: httpx.Response, operation: str) -> None: + try: + data = response.json() + except Exception: + data = {"error": response.text} + raise RuntimeError( + f"Failed to {operation}: {response.status_code} " + f"{response.reason_phrase} - {data}" + ) + + async def _list( + self, + *, + team_id: str | None = None, + slug: str | None = None, + query: dict[str, Any] | None = None, + ) -> dict[str, Any]: + params = self._build_params(team_id, slug) + if query: + params.update(query) + + response = await self._transport.send("GET", "/v10/projects", params=params) + + if response.status_code != 200: + self._handle_error(response, "list projects") + + return response.json() + + async def _create( + self, + *, + body: dict[str, Any], + team_id: str | None = None, + slug: str | None = None, + ) -> dict[str, Any]: + params = self._build_params(team_id, slug) + response = await self._transport.send( + "POST", "/v11/projects", params=params, json=body + ) + + if not (200 <= response.status_code < 300): + self._handle_error(response, "create project") + + track("project_create", token=self._config.access_token) + return response.json() + + async def _update( + self, + id_or_name: str, + *, + body: dict[str, Any], + team_id: str | None = None, + slug: str | None = None, + ) -> dict[str, Any]: + params = self._build_params(team_id, slug) + path = f"/v9/projects/{urllib.parse.quote(id_or_name, safe='')}" + + response = await self._transport.send("PATCH", path, params=params, json=body) + + if response.status_code != 200: + self._handle_error(response, "update project") + + track("project_update", token=self._config.access_token) + return response.json() + + async def _delete( + self, + id_or_name: str, + *, + team_id: str | None = None, + slug: str | None = None, + ) -> None: + params = self._build_params(team_id, slug) + path = f"/v9/projects/{urllib.parse.quote(id_or_name, safe='')}" + + response = await self._transport.send("DELETE", path, params=params) + + if response.status_code != 204: + self._handle_error(response, "delete project") + + track("project_delete", token=self._config.access_token) + + +class ProjectsClient(BaseProjectsClient): + def list( + self, + *, + team_id: str | None = None, + slug: str | None = None, + query: dict[str, Any] | None = None, + ) -> dict[str, Any]: + return iter_coroutine(self._list(team_id=team_id, slug=slug, query=query)) + + def create( + self, + *, + body: dict[str, Any], + team_id: str | None = None, + slug: str | None = None, + ) -> dict[str, Any]: + return iter_coroutine(self._create(body=body, team_id=team_id, slug=slug)) + + def update( + self, + id_or_name: str, + *, + body: dict[str, Any], + team_id: str | None = None, + slug: str | None = None, + ) -> dict[str, Any]: + return iter_coroutine( + self._update(id_or_name, body=body, team_id=team_id, slug=slug) + ) + + def delete( + self, + id_or_name: str, + *, + team_id: str | None = None, + slug: str | None = None, + ) -> None: + return iter_coroutine(self._delete(id_or_name, team_id=team_id, slug=slug)) + + +class AsyncProjectsClient(BaseProjectsClient): + async def list( + self, + *, + team_id: str | None = None, + slug: str | None = None, + query: dict[str, Any] | None = None, + ) -> dict[str, Any]: + return await self._list(team_id=team_id, slug=slug, query=query) + + async def create( + self, + *, + body: dict[str, Any], + team_id: str | None = None, + slug: str | None = None, + ) -> dict[str, Any]: + return await self._create(body=body, team_id=team_id, slug=slug) + + async def update( + self, + id_or_name: str, + *, + body: dict[str, Any], + team_id: str | None = None, + slug: str | None = None, + ) -> dict[str, Any]: + return await self._update(id_or_name, body=body, team_id=team_id, slug=slug) + + async def delete( + self, + id_or_name: str, + *, + team_id: str | None = None, + slug: str | None = None, + ) -> None: + return await self._delete(id_or_name, team_id=team_id, slug=slug) diff --git a/src/vercel/_core/transport.py b/src/vercel/_core/transport.py new file mode 100644 index 0000000..dd3fa47 --- /dev/null +++ b/src/vercel/_core/transport.py @@ -0,0 +1,120 @@ +"""Transport layer for HTTP operations.""" + +from __future__ import annotations + +import abc +from typing import Any + +import httpx + +from .config import ClientConfig + + +class BaseTransport(abc.ABC): + """Abstract transport with async interface.""" + + def __init__(self, config: ClientConfig): + self.config = config + + @abc.abstractmethod + async def send( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + json: Any | None = None, + content: bytes | None = None, + headers: dict[str, str] | None = None, + ) -> httpx.Response: ... + + @abc.abstractmethod + async def close(self) -> None: ... + + +class BlockingTransport(BaseTransport): + """Sync I/O transport. Methods are async def but don't suspend.""" + + def __init__(self, config: ClientConfig): + super().__init__(config) + self._client: httpx.Client | None = None + + def _get_client(self) -> httpx.Client: + if self._client is None: + self._client = httpx.Client( + timeout=httpx.Timeout(self.config.timeout), + ) + return self._client + + async def send( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + json: Any | None = None, + content: bytes | None = None, + headers: dict[str, str] | None = None, + ) -> httpx.Response: + url = self.config.build_url(path) + request_headers = self.config.get_auth_headers() + if headers: + request_headers.update(headers) + + return self._get_client().request( + method, + url, + params=params, + json=json, + content=content, + headers=request_headers, + ) + + async def close(self) -> None: + if self._client is not None: + self._client.close() + self._client = None + + +class AsyncTransport(BaseTransport): + """Async I/O transport using httpx.AsyncClient.""" + + def __init__(self, config: ClientConfig): + super().__init__(config) + self._client: httpx.AsyncClient | None = None + + def _get_client(self) -> httpx.AsyncClient: + if self._client is None: + self._client = httpx.AsyncClient( + timeout=httpx.Timeout(self.config.timeout), + ) + return self._client + + async def send( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + json: Any | None = None, + content: bytes | None = None, + headers: dict[str, str] | None = None, + ) -> httpx.Response: + url = self.config.build_url(path) + request_headers = self.config.get_auth_headers() + if headers: + request_headers.update(headers) + + return await self._get_client().request( + method, + url, + params=params, + json=json, + content=content, + headers=request_headers, + ) + + async def close(self) -> None: + if self._client is not None: + await self._client.aclose() + self._client = None diff --git a/tests/test_core_client.py b/tests/test_core_client.py new file mode 100644 index 0000000..9aad8ed --- /dev/null +++ b/tests/test_core_client.py @@ -0,0 +1,280 @@ +"""Tests for the core client with namespaced sub-clients.""" + +import httpx +import pytest +import respx + +from vercel._core import ( + AsyncVercelClient, + ClientConfig, + VercelClient, +) + + +@pytest.fixture +def api_mock(): + """Mock API for testing.""" + with respx.mock(assert_all_called=False, base_url="https://api.vercel.com") as mock: + # GET /v10/projects + mock.get("/v10/projects").mock( + return_value=httpx.Response( + 200, json={"projects": [{"id": "1"}, {"id": "2"}], "pagination": {}} + ) + ) + + # POST /v11/projects + mock.post("/v11/projects").mock( + return_value=httpx.Response(200, json={"id": "new", "name": "Created"}) + ) + + # PATCH /v9/projects/:id + mock.patch("/v9/projects/123").mock( + return_value=httpx.Response(200, json={"id": "123", "name": "Updated"}) + ) + + # DELETE /v9/projects/:id + mock.delete("/v9/projects/123").mock(return_value=httpx.Response(204)) + + # POST /v13/deployments + mock.post("/v13/deployments").mock( + return_value=httpx.Response(200, json={"id": "dpl_1", "url": "test.vercel.app"}) + ) + + # POST /v2/files + mock.post("/v2/files").mock( + return_value=httpx.Response(200, json={"url": "file-url"}) + ) + + yield mock + + +class TestSyncClient: + """Test synchronous VercelClient with namespaced API.""" + + def test_projects_list(self, api_mock): + """Test client.projects.list().""" + with VercelClient(access_token="test-token") as client: + response = client.projects.list() + + assert "projects" in response + assert len(response["projects"]) == 2 + + def test_projects_list_with_team(self, api_mock): + """Test client.projects.list() with team_id.""" + with VercelClient(access_token="test-token") as client: + response = client.projects.list(team_id="team_123") + + assert "projects" in response + request = api_mock.calls[0].request + assert "teamId=team_123" in str(request.url) + + def test_projects_create(self, api_mock): + """Test client.projects.create().""" + with VercelClient(access_token="test-token") as client: + response = client.projects.create(body={"name": "my-project"}) + + assert response["id"] == "new" + assert response["name"] == "Created" + + def test_projects_update(self, api_mock): + """Test client.projects.update().""" + with VercelClient(access_token="test-token") as client: + response = client.projects.update("123", body={"name": "Updated"}) + + assert response["id"] == "123" + assert response["name"] == "Updated" + + def test_projects_delete(self, api_mock): + """Test client.projects.delete().""" + with VercelClient(access_token="test-token") as client: + result = client.projects.delete("123") + + assert result is None + + def test_deployments_create(self, api_mock): + """Test client.deployments.create().""" + with VercelClient(access_token="test-token") as client: + response = client.deployments.create(body={"name": "my-deployment"}) + + assert response["id"] == "dpl_1" + assert response["url"] == "test.vercel.app" + + def test_deployments_upload_file(self, api_mock): + """Test client.deployments.upload_file().""" + with VercelClient(access_token="test-token") as client: + response = client.deployments.upload_file( + content=b"hello world", + content_length=11, + x_vercel_digest="abc123", + ) + + assert response["url"] == "file-url" + + +class TestAsyncClient: + """Test asynchronous AsyncVercelClient with namespaced API.""" + + @pytest.mark.asyncio + async def test_projects_list(self, api_mock): + """Test await client.projects.list().""" + async with AsyncVercelClient(access_token="test-token") as client: + response = await client.projects.list() + + assert "projects" in response + assert len(response["projects"]) == 2 + + @pytest.mark.asyncio + async def test_projects_list_with_team(self, api_mock): + """Test await client.projects.list() with team_id.""" + async with AsyncVercelClient(access_token="test-token") as client: + response = await client.projects.list(team_id="team_123") + + assert "projects" in response + request = api_mock.calls[0].request + assert "teamId=team_123" in str(request.url) + + @pytest.mark.asyncio + async def test_projects_create(self, api_mock): + """Test await client.projects.create().""" + async with AsyncVercelClient(access_token="test-token") as client: + response = await client.projects.create(body={"name": "my-project"}) + + assert response["id"] == "new" + assert response["name"] == "Created" + + @pytest.mark.asyncio + async def test_projects_update(self, api_mock): + """Test await client.projects.update().""" + async with AsyncVercelClient(access_token="test-token") as client: + response = await client.projects.update("123", body={"name": "Updated"}) + + assert response["id"] == "123" + assert response["name"] == "Updated" + + @pytest.mark.asyncio + async def test_projects_delete(self, api_mock): + """Test await client.projects.delete().""" + async with AsyncVercelClient(access_token="test-token") as client: + result = await client.projects.delete("123") + + assert result is None + + @pytest.mark.asyncio + async def test_deployments_create(self, api_mock): + """Test await client.deployments.create().""" + async with AsyncVercelClient(access_token="test-token") as client: + response = await client.deployments.create(body={"name": "my-deployment"}) + + assert response["id"] == "dpl_1" + assert response["url"] == "test.vercel.app" + + @pytest.mark.asyncio + async def test_deployments_upload_file(self, api_mock): + """Test await client.deployments.upload_file().""" + async with AsyncVercelClient(access_token="test-token") as client: + response = await client.deployments.upload_file( + content=b"hello world", + content_length=11, + x_vercel_digest="abc123", + ) + + assert response["url"] == "file-url" + + +class TestClientConfig: + """Test ClientConfig.""" + + def test_resolve_token_from_config(self): + """Test token resolution from config.""" + config = ClientConfig(access_token="my-token") + assert config.resolve_token() == "my-token" + + def test_resolve_token_missing(self, monkeypatch): + """Test token resolution fails when missing.""" + monkeypatch.delenv("VERCEL_TOKEN", raising=False) + config = ClientConfig() + with pytest.raises(RuntimeError, match="Missing Vercel API token"): + config.resolve_token() + + def test_resolve_token_from_env(self, monkeypatch): + """Test token resolution from environment.""" + monkeypatch.setenv("VERCEL_TOKEN", "env-token") + config = ClientConfig() + assert config.resolve_token() == "env-token" + + def test_build_url(self): + """Test URL building.""" + config = ClientConfig(base_url="https://api.vercel.com") + assert config.build_url("/v10/projects") == "https://api.vercel.com/v10/projects" + + +class TestCodeSharing: + """Tests demonstrating that business logic is truly shared.""" + + def test_same_config_structure(self): + """Both clients accept the same config structure.""" + sync_client = VercelClient( + access_token="test", + base_url="https://api.vercel.com", + timeout=30.0, + default_team_id="team_123", + ) + async_client = AsyncVercelClient( + access_token="test", + base_url="https://api.vercel.com", + timeout=30.0, + default_team_id="team_123", + ) + + assert sync_client._config.access_token == async_client._config.access_token + assert sync_client._config.base_url == async_client._config.base_url + assert sync_client._config.timeout == async_client._config.timeout + assert sync_client._config.default_team_id == async_client._config.default_team_id + + sync_client.close() + + def test_sub_clients_share_transport(self): + """Sub-clients share the same transport instance.""" + with VercelClient(access_token="test") as client: + assert client.projects._transport is client.deployments._transport + assert client.projects._transport is client._transport + + +class TestErrorHandling: + """Test error handling.""" + + def test_sync_client_error_response(self): + """Test sync client handles error responses.""" + with respx.mock(base_url="https://api.vercel.com") as mock: + mock.get("/v10/projects").mock( + return_value=httpx.Response(401, json={"error": "Unauthorized"}) + ) + + with pytest.raises(RuntimeError, match="Failed to list projects"): + with VercelClient(access_token="test-token") as client: + client.projects.list() + + @pytest.mark.asyncio + async def test_async_client_error_response(self): + """Test async client handles error responses.""" + with respx.mock(base_url="https://api.vercel.com") as mock: + mock.get("/v10/projects").mock( + return_value=httpx.Response(401, json={"error": "Unauthorized"}) + ) + + with pytest.raises(RuntimeError, match="Failed to list projects"): + async with AsyncVercelClient(access_token="test-token") as client: + await client.projects.list() + + def test_deployments_create_body_validation(self): + """Test deployment body validation is shared.""" + with VercelClient(access_token="test-token") as client: + with pytest.raises(ValueError, match="body must be a dict"): + client.deployments.create(body="not a dict") # type: ignore + + @pytest.mark.asyncio + async def test_async_deployments_create_body_validation(self): + """Test async deployment body validation is shared.""" + async with AsyncVercelClient(access_token="test-token") as client: + with pytest.raises(ValueError, match="body must be a dict"): + await client.deployments.create(body="not a dict") # type: ignore