Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/utils/web_search/.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
!app.py
!db.py
!auth.py
!daily_usage.py
!requirements-app.txt
!requirements_app.in
!Dockerfile
19 changes: 17 additions & 2 deletions src/utils/web_search/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,23 @@ FIRESTORE_PROJECT_ID=***
FIRESTORE_DATABASE_NAME=***
FIRESTORE_COLLECTION=apiKeys

PBKDF2_ITERATIONS=20000
PBKDF2_SALT_BYTES=16
GEMINI_MAX_ATTEMPTS=1
GEMINI_MAX_BACKOFF_SECONDS=2

API_KEY_PBKDF2_ITERATIONS=20000
API_KEY_PBKDF2_SALT_BYTES=16

API_KEY_CACHE_TTL=30
API_KEY_CACHE_MAX_ITEMS=1024

API_KEY_USAGE_MAX_RETRIES=8
API_KEY_USAGE_BASE_DELAY=0.05
API_KEY_USAGE_MAX_DELAY=1.0

DAILY_USAGE_COLLECTION=dailyUsageCounters
DAILY_USAGE_MAX_RETRIES=8
DAILY_USAGE_BASE_DELAY=0.05
DAILY_USAGE_MAX_DELAY=1.0

GEMINI_GROUNDING_FREE_LIMIT_PRO=1500
GEMINI_GROUNDING_FREE_LIMIT_FLASH=1500
2 changes: 1 addition & 1 deletion src/utils/web_search/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ RUN pip install --no-cache-dir -r requirements-app.txt

RUN mkdir -p /app/src/utils/web_search
RUN touch /app/src/utils/__init__.py
COPY __init__.py app.py auth.py db.py /app/src/utils/web_search/
COPY __init__.py app.py auth.py db.py daily_usage.py /app/src/utils/web_search/

ENV PYTHONPATH=/app/src
CMD ["uvicorn", "utils.web_search.app:app", "--host", "0.0.0.0", "--port", "8080"]
6 changes: 6 additions & 0 deletions src/utils/web_search/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ gcloud auth configure-docker "$REGION-docker.pkg.dev"
| `GEMINI_API_KEY` | Gemini API key used by the proxy | _(required)_ |
| `GEMINI_MAX_ATTEMPTS`, `GEMINI_MAX_BACKOFF_SECONDS` | Retry tuning | `5`, `10` |
| `API_KEY_CACHE_TTL`, `API_KEY_CACHE_MAX_ITEMS` | Auth cache tuning | `30`, `1024` |
| `DAILY_USAGE_COLLECTION` | Collection that stores per-day usage counters | `dailyUsageCounters` |
| `DAILY_USAGE_MAX_RETRIES`, `DAILY_USAGE_BASE_DELAY`, `DAILY_USAGE_MAX_DELAY` | Daily usage retry tuning | `8`, `0.05`, `1.0` |
| `GEMINI_GROUNDING_FREE_LIMIT_PRO` | Daily free allowance for `gemini-2.5-pro` | `1500` |
| `GEMINI_GROUNDING_FREE_LIMIT_FLASH` | Shared daily free allowance for Flash/Flash-Lite | `1500` |

Keep `.env.example` up to date so teammates can copy it into their own `.env`.

Expand Down Expand Up @@ -81,6 +85,8 @@ Keep `.env.example` up to date so teammates can copy it into their own `.env`.
export FIRESTORE_DATABASE_NAME=grounding
export FIRESTORE_EMULATOR_HOST=0.0.0.0:8922
export GEMINI_API_KEY="dev-placeholder"
export GEMINI_GROUNDING_FREE_LIMIT_PRO=1500
export GEMINI_GROUNDING_FREE_LIMIT_FLASH=1500
```

4. **Install Python dependencies**
Expand Down
191 changes: 151 additions & 40 deletions src/utils/web_search/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
InactiveAPIKeyError,
InvalidAPIKeyError,
)
from .daily_usage import DailyUsageRepository
from .db import APIKeyRecord, APIKeyRepository, UsageLimitExceededError


Expand All @@ -37,6 +38,58 @@
FIRESTORE_COLLECTION = os.getenv("FIRESTORE_COLLECTION", "apiKeys")
API_KEY_CACHE_TTL = int(os.getenv("API_KEY_CACHE_TTL", "30"))
API_KEY_CACHE_MAX_ITEMS = int(os.getenv("API_KEY_CACHE_MAX_ITEMS", "1024"))
FREE_LIMIT_DEFAULT_PRO = 1500
FREE_LIMIT_DEFAULT_FLASH = 1500


def _parse_free_limit(env_var: str, default: int) -> int:
"""Parse a non-negative integer from the environment with logging."""
value = os.getenv(env_var)
if value is None or value == "":
return default
try:
parsed = int(value)
except ValueError:
logger.warning(
"Invalid value '%s' for %s; falling back to %d",
value,
env_var,
default,
)
return default
if parsed < 0:
logger.warning(
"Negative value '%s' for %s; treating as 0",
value,
env_var,
)
return 0
return parsed


MODEL_TO_USAGE_BUCKET: dict[str, str] = {
"gemini-2.5-pro": "gemini-2.5-pro",
"gemini-2.5-flash": "gemini-2.5-flash-family",
"gemini-2.5-flash-lite": "gemini-2.5-flash-family",
}

BUCKET_FREE_LIMITS: dict[str, int] = {
"gemini-2.5-pro": _parse_free_limit(
"GEMINI_GROUNDING_FREE_LIMIT_PRO",
FREE_LIMIT_DEFAULT_PRO,
),
"gemini-2.5-flash-family": _parse_free_limit(
"GEMINI_GROUNDING_FREE_LIMIT_FLASH",
FREE_LIMIT_DEFAULT_FLASH,
),
}


def _resolve_usage_bucket(model: str) -> tuple[str, int]:
"""Return the usage bucket and free allowance for the given model."""
bucket = MODEL_TO_USAGE_BUCKET.get(model, model)
return bucket, BUCKET_FREE_LIMITS.get(bucket, 0)


RETRYABLE_EXCEPTIONS: tuple[type[Exception], ...] = (
google_exceptions.ResourceExhausted,
Expand Down Expand Up @@ -189,6 +242,13 @@ async def startup_event() -> None:
cache_ttl_seconds=API_KEY_CACHE_TTL,
cache_max_items=API_KEY_CACHE_MAX_ITEMS,
)
app.state.daily_usage_repository = DailyUsageRepository(
firestore_client,
collection_name=os.getenv(
"DAILY_USAGE_COLLECTION",
"dailyUsageCounters",
),
)


async def shutdown_event() -> None:
Expand Down Expand Up @@ -235,6 +295,18 @@ def get_authenticator() -> APIKeyAuthenticator:
return authenticator


def get_daily_usage_repository() -> DailyUsageRepository:
"""Return the daily usage repository stored on the app state."""
repository: DailyUsageRepository | None = getattr(
app.state,
"daily_usage_repository",
None,
)
if repository is None:
raise RuntimeError("Daily usage repository has not been initialised")
return repository


async def _authenticate_request(
api_key_header: str,
authenticator: APIKeyAuthenticator,
Expand Down Expand Up @@ -269,37 +341,6 @@ async def _authenticate_request(
) from exc


async def require_api_key(
api_key_header: Annotated[str, Header(alias="X-API-Key")],
authenticator: Annotated[APIKeyAuthenticator, Depends(get_authenticator)],
) -> APIKeyRecord:
"""Validate the user's API key and reserve a usage slot.

Parameters
----------
api_key_header : str
API key supplied in the ``X-API-Key`` header.
authenticator : APIKeyAuthenticator
Authenticator responsible for validating and reserving usage.

Returns
-------
APIKeyRecord
Updated API key record that includes the latest usage counter.

Raises
------
HTTPException
Raised when the API key is invalid, inactive, or has exhausted its
quota.
"""
return await _authenticate_request(
api_key_header,
authenticator,
consume_usage=True,
)


async def require_api_key_without_consumption(
api_key_header: Annotated[str, Header(alias="X-API-Key")],
authenticator: Annotated[APIKeyAuthenticator, Depends(get_authenticator)],
Expand Down Expand Up @@ -405,7 +446,9 @@ async def call_gemini_with_retry(request: RequestBody) -> types.GenerateContentR
)
except RETRYABLE_EXCEPTIONS as exc:
if attempt >= MAX_GEMINI_ATTEMPTS:
logger.exception("Gemini request failed after retries")
logger.exception(
"Gemini request failed after %d retries", MAX_GEMINI_ATTEMPTS
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Gemini is currently unavailable",
Expand Down Expand Up @@ -444,25 +487,93 @@ async def health() -> dict[str, str]:
@router.post("/v1/grounding_with_search")
async def search(
request: RequestBody,
_: Annotated[APIKeyRecord, Depends(require_api_key)],
record: Annotated[
APIKeyRecord,
Depends(require_api_key_without_consumption),
],
authenticator: Annotated[APIKeyAuthenticator, Depends(get_authenticator)],
daily_usage: Annotated[
DailyUsageRepository,
Depends(get_daily_usage_repository),
],
) -> dict[str, object]:
"""Proxy Gemini grounding requests with quota enforcement.

Parameters
----------
request : RequestBody
Payload describing the Gemini call.
_ : APIKeyRecord
API key record produced by ``require_api_key``. The underscore keeps
the dependency explicit without exposing it to callers.
record : APIKeyRecord
API key record produced by ``require_api_key``.
authenticator : APIKeyAuthenticator
Authenticator dependency used to roll back usage reservations on error.

Returns
-------
google.genai.types.GenerateContentResponse
Response returned by the Gemini model.
dict of str to object
JSON serialisable response returned by the Gemini model.
"""
response = await call_gemini_with_retry(request)
logger.info("Gemini request completed for model %s", request.model)
bucket, free_limit = _resolve_usage_bucket(request.model)
consumed_api_quota = False
reservation = await daily_usage.reserve(bucket, free_limit)

if not reservation.consumed_free:
try:
updated_record = await authenticator.consume_usage(record.lookup_hash)
except UsageLimitExceededError as exc:
await daily_usage.release(reservation)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="API key usage limit exceeded",
) from exc
except InvalidAPIKeyError as exc:
await daily_usage.release(reservation)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key provided",
) from exc
except InactiveAPIKeyError as exc:
await daily_usage.release(reservation)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="API key is inactive",
) from exc
except ExpiredAPIKeyError as exc:
await daily_usage.release(reservation)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="API key has expired",
) from exc

record = updated_record
consumed_api_quota = True

try:
response = await call_gemini_with_retry(request)
except Exception:
try:
await daily_usage.release(reservation)
except Exception: # pragma: no cover - defensive logging for rollbacks
logger.exception(
"Failed to roll back daily usage for bucket %s",
bucket,
)

if consumed_api_quota:
try:
await authenticator.release_usage(record.lookup_hash)
except Exception: # pragma: no cover - defensive logging for rollbacks
logger.exception(
"Failed to roll back usage for API key %s", record.lookup_hash
)
raise

logger.info(
"Gemini request completed for model %s (bucket=%s, consumed_free=%s)",
request.model,
bucket,
reservation.consumed_free if reservation else False,
)
return response.to_json_dict()


Expand Down
52 changes: 52 additions & 0 deletions src/utils/web_search/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,58 @@ async def reserve_usage(

return record

async def consume_usage(self, lookup_hash: str) -> APIKeyRecord:
"""Increment usage counter for a previously validated API key."""
record = self._cache_lookup(lookup_hash)

if not record:
try:
record = await self._repository.get_api_key(lookup_hash)
except APIKeyNotFoundError as exc:
raise InvalidAPIKeyError("API key not recognised") from exc
self._cache_store(record)

if record.status != "active":
raise InactiveAPIKeyError("API key has been suspended")

if record.expires_at and self._clock() >= record.expires_at:
self._cache.pop(lookup_hash, None)
raise ExpiredAPIKeyError("API key has expired")

try:
updated_record = await self._repository.update_usage_counter(lookup_hash)
except APIKeyNotFoundError as exc:
self._cache.pop(lookup_hash, None)
raise InvalidAPIKeyError("API key not recognised") from exc

self._cache_store(updated_record)
return updated_record

async def release_usage(self, lookup_hash: str) -> APIKeyRecord:
"""Rollback a previously reserved usage slot.

Parameters
----------
lookup_hash : str
Lookup hash corresponding to the API key whose usage should be
decremented.

Returns
-------
APIKeyRecord
Updated record containing the decremented usage counter.
"""
try:
updated_record = await self._repository.decrement_usage_counter(
lookup_hash,
)
except APIKeyNotFoundError as exc: # pragma: no cover - defensive branch
self._cache.pop(lookup_hash, None)
raise InvalidAPIKeyError("API key not recognised") from exc

self._cache_store(updated_record)
return updated_record

async def create_api_key(
self,
*,
Expand Down
Loading