-
Notifications
You must be signed in to change notification settings - Fork 2
init: boilerplate and helper setup #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
208a36d
5bd3ecc
4aeed7e
b2f3672
5c97620
4de438c
ed29e12
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,8 +2,8 @@ | |
|
|
||
| ## Requirements | ||
|
|
||
| * [Docker](https://www.docker.com/). | ||
| * [uv](https://docs.astral.sh/uv/) for Python package and environment management. | ||
| - [Docker](https://www.docker.com/). | ||
| - [uv](https://docs.astral.sh/uv/) for Python package and environment management. | ||
|
|
||
| ## Docker Compose | ||
|
|
||
|
|
@@ -127,23 +127,23 @@ As during local development your app directory is mounted as a volume inside the | |
|
|
||
| Make sure you create a "revision" of your models and that you "upgrade" your database with that revision every time you change them. As this is what will update the tables in your database. Otherwise, your application will have errors. | ||
|
|
||
| * Start an interactive session in the backend container: | ||
| - Start an interactive session in the backend container: | ||
|
|
||
| ```console | ||
| $ docker compose exec backend bash | ||
| ``` | ||
|
|
||
| * Alembic is already configured to import your SQLModel models from `./backend/app/models.py`. | ||
| - Alembic is already configured to import your SQLModel models from `./backend/app/models.py`. | ||
|
|
||
| * After changing a model (for example, adding a column), inside the container, create a revision, e.g.: | ||
| - After changing a model (for example, adding a column), inside the container, create a revision, e.g.: | ||
|
|
||
| ```console | ||
| $ alembic revision --autogenerate -m "Add column last_name to User model" | ||
| ``` | ||
|
|
||
| * Commit to the git repository the files generated in the alembic directory. | ||
| - Commit to the git repository the files generated in the alembic directory. | ||
|
|
||
| * After creating the revision, run the migration in the database (this is what will actually change the database): | ||
| - After creating the revision, run the migration in the database (this is what will actually change the database): | ||
|
|
||
| ```console | ||
| $ alembic upgrade head | ||
|
|
@@ -170,3 +170,44 @@ The email templates are in `./backend/app/email-templates/`. Here, there are two | |
| Before continuing, ensure you have the [MJML extension](https://marketplace.visualstudio.com/items?itemName=attilabuti.vscode-mjml) installed in your VS Code. | ||
|
|
||
| Once you have the MJML extension installed, you can create a new email template in the `src` directory. After creating the new email template and with the `.mjml` file open in your editor, open the command palette with `Ctrl+Shift+P` and search for `MJML: Export to HTML`. This will convert the `.mjml` file to a `.html` file and now you can save it in the build directory. | ||
|
|
||
| ## Background Tasks (Celery) and Upstash Redis | ||
|
|
||
| This project supports running background tasks using Celery with Redis as | ||
| broker/result backend. You can use a local Redis (via Docker Compose) or a | ||
| hosted provider such as Upstash. The project reads these settings from the | ||
| environment via the `app.core.settings` values. | ||
|
|
||
| - **Configure via `.env` or environment**: set either `REDIS_URL` (recommended) | ||
| or `CELERY_BROKER_URL` and `CELERY_RESULT_BACKEND` explicitly. For Upstash | ||
| use the `rediss://` URL provided by Upstash (it contains the host and token). | ||
|
|
||
| Example `.env` entries for Upstash (replace with your values): | ||
|
|
||
| ``` | ||
| REDIS_URL=rediss://default:[email protected]:6379 | ||
| # or explicit celery vars | ||
| CELERY_BROKER_URL=rediss://default:[email protected]:6379 | ||
| CELERY_RESULT_BACKEND=rediss://default:[email protected]:6379 | ||
| ``` | ||
|
|
||
| - **Run worker (recommended)**: from the `backend/` directory either use the | ||
| Celery CLI or the lightweight Python entrypoint: | ||
|
|
||
| ``` | ||
| # using Celery CLI (preferred) | ||
| celery -A app.core.celery_app.celery_app worker --loglevel=info | ||
|
|
||
| # quick start via python entrypoint (run from the `backend/` directory) | ||
| # module form: | ||
| python -m app.workers.celery_worker | ||
| ``` | ||
|
|
||
| - **Test a task**: in a Python shell (with your virtualenv activated): | ||
|
|
||
| ``` | ||
| python -c "from app.workers import add; res = add.delay(2,3); print(res.get(timeout=10))" | ||
| ``` | ||
|
|
||
| The example tasks are in `app/tasks.py`. Replace `send_welcome_email` with | ||
| your real email sending logic to run it asynchronously. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| **WebSocket Infrastructure**: Quick setup | ||
|
|
||
| - **Purpose**: Provide real-time sync across connected clients and across multiple app instances using Redis pub/sub. | ||
| - **Components**: | ||
|
|
||
| - `app.api.websocket_manager.WebSocketManager`: manages local WebSocket connections and subscribes to Redis channels `ws:{room}`. | ||
| - `app.api.routes.ws`: WebSocket endpoint at `GET /api/v1/ws/{room}` (path under API prefix). | ||
| - Uses existing Redis client configured via `REDIS_URL` in `app.core.config.Settings`. | ||
|
|
||
| - **How it works**: | ||
|
|
||
| - Each connected client opens a WebSocket to `/api/v1/ws/{room}`. | ||
| - When a client sends a text message, the endpoint publishes the message to Redis channel `ws:{room}`. | ||
| - The `WebSocketManager` subscribes to `ws:*` and forwards published messages to all local WebSocket connections in the given room. | ||
| - This allows multiple app instances to broadcast to each other's connected clients. | ||
|
|
||
| - **Env / Config**: | ||
|
|
||
| - Ensure `REDIS_URL` is configured in the project's environment (default: `redis://redis:6379/0`). | ||
|
|
||
| - **Frontend example** (browser JS): | ||
|
|
||
| ```js | ||
| const ws = new WebSocket(`wss://your-backend.example.com/api/v1/ws/room-123`); | ||
| ws.addEventListener("message", (ev) => console.log("msg", ev.data)); | ||
| ws.addEventListener("open", () => ws.send(JSON.stringify({ type: "hello" }))); | ||
| ``` | ||
|
|
||
| - **Notes & next steps**: | ||
| - Messages are sent/received as plain text; consider JSON schema enforcement and auth. | ||
| - Add authentication (JWT in query param/header) and room access checks as needed. | ||
| - Consider rate limiting and maximum connections per client. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| from fastapi import APIRouter, WebSocket, WebSocketDisconnect | ||
|
|
||
| router = APIRouter() | ||
|
|
||
|
|
||
| @router.websocket("/ws/{room}") | ||
| async def websocket_endpoint(websocket: WebSocket, room: str): | ||
| """Simple WebSocket endpoint that forwards client messages to Redis | ||
| and receives published messages via the WebSocketManager (attached to | ||
| the app state) to broadcast to local clients. | ||
| """ | ||
| manager = websocket.app.state.ws_manager | ||
| await manager.connect(websocket, room) | ||
| try: | ||
| while True: | ||
| # receive text from client and publish to Redis so other instances | ||
| # receive it and forward to their connected clients | ||
| data = await websocket.receive_text() | ||
| await manager.publish(room, data) | ||
| except WebSocketDisconnect: | ||
| await manager.disconnect(websocket, room) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| import asyncio | ||
| import logging | ||
| from typing import Dict, Set | ||
|
|
||
| from fastapi import WebSocket | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class WebSocketManager: | ||
| """Manage WebSocket connections and Redis pub/sub bridging. | ||
|
|
||
| - Keeps in-memory mapping of rooms -> WebSocket connections for local broadcasts. | ||
| - Subscribes to Redis channels `ws:{room}` and broadcasts published messages | ||
| to local connections so multiple app instances stay in sync. | ||
| """ | ||
|
|
||
| def __init__(self, redis_client): | ||
| self.redis = redis_client | ||
| self.connections: Dict[str, Set[WebSocket]] = {} | ||
| self._pubsub = None | ||
| self._listen_task: asyncio.Task | None = None | ||
|
|
||
| async def start(self) -> None: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅ Excellent architectural choice! Using Redis pub/sub for WebSocket communication enables horizontal scaling. This design allows multiple backend instances to share WebSocket messages, which is critical for production deployments behind load balancers. Benefits:
Note: Just needs authentication layer (see security comments on |
||
| try: | ||
| self._pubsub = self.redis.pubsub() | ||
| # Subscribe to all ws channels using pattern subscription | ||
| await self._pubsub.psubscribe("ws:*") | ||
| self._listen_task = asyncio.create_task(self._reader_loop()) | ||
| logger.info("WebSocketManager redis listener started") | ||
| except Exception as e: | ||
| logger.warning(f"WebSocketManager start failed: {e}") | ||
|
|
||
| async def _reader_loop(self) -> None: | ||
| try: | ||
| async for message in self._pubsub.listen(): | ||
| if not message: | ||
| continue | ||
| mtype = message.get("type") | ||
| # handle pmessage (pattern) and message | ||
| if mtype not in ("pmessage", "message"): | ||
| continue | ||
| # redis.asyncio returns bytes for channel/data in some setups | ||
| channel = message.get("channel") or message.get("pattern") | ||
| data = message.get("data") | ||
| if isinstance(channel, (bytes, bytearray)): | ||
| channel = channel.decode() | ||
| if isinstance(data, (bytes, bytearray)): | ||
| data = data.decode() | ||
| # channel format: ws:<room> | ||
| try: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using bare
Fix: Add specific exception handling with logging: try:
room = str(channel).split("ws:", 1)[1]
except IndexError:
logger.error(f"Invalid channel format: {channel}", extra={"message": message})
continue
except Exception as e:
logger.exception(f"Unexpected error: {e}", extra={"message": message})
# Send to error tracking (Sentry, etc.)
continue |
||
| room = str(channel).split("ws:", 1)[1] | ||
| except Exception: | ||
| continue | ||
| await self._broadcast_to_local(room, data) | ||
| except asyncio.CancelledError: | ||
| logger.info("WebSocketManager listener task cancelled") | ||
| except Exception as e: | ||
| logger.exception(f"WebSocketManager listener error: {e}") | ||
|
|
||
| async def publish(self, room: str, message: str) -> None: | ||
| try: | ||
| await self.redis.publish(f"ws:{room}", message) | ||
| except Exception as e: | ||
| logger.warning(f"Failed to publish websocket message: {e}") | ||
|
|
||
| async def connect(self, websocket: WebSocket, room: str) -> None: | ||
| await websocket.accept() | ||
| self.connections.setdefault(room, set()).add(websocket) | ||
|
|
||
| async def disconnect(self, websocket: WebSocket, room: str) -> None: | ||
| conns = self.connections.get(room) | ||
| if not conns: | ||
| return | ||
| conns.discard(websocket) | ||
| if not conns: | ||
| self.connections.pop(room, None) | ||
|
|
||
| async def send_personal(self, websocket: WebSocket, message: str) -> None: | ||
| await websocket.send_text(message) | ||
|
|
||
| async def _broadcast_to_local(self, room: str, message: str) -> None: | ||
| conns = list(self.connections.get(room, [])) | ||
| for ws in conns: | ||
| try: | ||
| await ws.send_text(message) | ||
| except Exception: | ||
| # ignore send errors; disconnect will clean up | ||
| pass | ||
|
|
||
| async def stop(self) -> None: | ||
| if self._listen_task: | ||
| self._listen_task.cancel() | ||
| try: | ||
| await self._listen_task | ||
| except Exception: | ||
| pass | ||
| if self._pubsub: | ||
| try: | ||
| await self._pubsub.close() | ||
| except Exception: | ||
| pass | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from celery import Celery | ||
|
|
||
| from .config import settings | ||
|
|
||
|
|
||
| broker_url = settings.CELERY_BROKER_URL or settings.REDIS_URL | ||
| result_backend = settings.CELERY_RESULT_BACKEND or settings.REDIS_URL | ||
|
|
||
|
|
||
| celery_app = Celery( | ||
| settings.PROJECT_NAME if getattr(settings, "PROJECT_NAME", None) else "app", | ||
| broker=broker_url, | ||
| backend=result_backend, | ||
| ) | ||
|
|
||
|
|
||
| celery_app.conf.update( | ||
| result_expires=3600, | ||
| task_serializer="json", | ||
| result_serializer="json", | ||
| accept_content=["json"], | ||
| timezone="UTC", | ||
| enable_utc=True, | ||
| ) | ||
|
|
||
| celery_app.autodiscover_tasks(["app.workers"]) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,6 +55,21 @@ def all_cors_origins(self) -> list[str]: | |
| POSTGRES_USER: str | ||
| POSTGRES_PASSWORD: str = "" | ||
| POSTGRES_DB: str = "" | ||
| # Redis connection URL. Default points to the compose service `redis`. | ||
| REDIS_URL: str = "redis://redis:6379/0" | ||
| # Celery broker/result backend. By default reuse `REDIS_URL` so you can | ||
| # configure an Upstash or other hosted Redis via `REDIS_URL` or explicitly | ||
| # via `CELERY_BROKER_URL` / `CELERY_RESULT_BACKEND` env vars. | ||
| CELERY_BROKER_URL: str | None = None | ||
| CELERY_RESULT_BACKEND: str | None = None | ||
|
|
||
| # Cloudflare R2 (S3 compatible) settings | ||
| R2_ENABLED: bool = False | ||
| R2_ACCOUNT_ID: str | None = None | ||
| R2_ACCESS_KEY_ID: str | None = None | ||
| R2_SECRET_ACCESS_KEY: str | None = None | ||
| R2_BUCKET: str | None = None | ||
| R2_ENDPOINT_URL: AnyUrl | None = None | ||
|
|
||
| @computed_field # type: ignore[prop-decorator] | ||
| @property | ||
|
|
@@ -90,6 +105,39 @@ def _set_default_emails_from(self) -> Self: | |
| def emails_enabled(self) -> bool: | ||
| return bool(self.SMTP_HOST and self.EMAILS_FROM_EMAIL) | ||
|
|
||
| @computed_field # type: ignore[prop-decorator] | ||
| @property | ||
| def r2_endpoint(self) -> str | None: | ||
| """Return explicit endpoint URL if set, otherwise construct from account id.""" | ||
| if self.R2_ENDPOINT_URL: | ||
| return str(self.R2_ENDPOINT_URL) | ||
| if self.R2_ACCOUNT_ID: | ||
| return f"https://{self.R2_ACCOUNT_ID}.r2.cloudflarestorage.com" | ||
| return None | ||
|
|
||
| @computed_field # type: ignore[prop-decorator] | ||
| @property | ||
| def r2_enabled(self) -> bool: | ||
| """Whether R2 integration is configured/enabled.""" | ||
| if not self.R2_ENABLED: | ||
| return False | ||
| return bool(self.R2_BUCKET and self.R2_ACCESS_KEY_ID and self.R2_SECRET_ACCESS_KEY) | ||
|
|
||
| @computed_field # type: ignore[prop-decorator] | ||
| @property | ||
| def r2_boto3_config(self) -> dict[str, Any]: | ||
| """Return a dict of kwargs suitable for boto3/aioboto3 client creation.""" | ||
| if not self.r2_enabled: | ||
| return {} | ||
| cfg: dict[str, Any] = { | ||
| "aws_access_key_id": self.R2_ACCESS_KEY_ID, | ||
| "aws_secret_access_key": self.R2_SECRET_ACCESS_KEY, | ||
| } | ||
| endpoint = self.r2_endpoint | ||
| if endpoint: | ||
| cfg["endpoint_url"] = endpoint | ||
| return cfg | ||
|
|
||
| EMAIL_TEST_USER: EmailStr = "[email protected]" | ||
| FIRST_SUPERUSER: EmailStr | ||
| FIRST_SUPERUSER_PASSWORD: str | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 BLOCKER: Critical security vulnerability - No authentication on WebSocket endpoint.
Security Issues:
Required Fix:
See main review for complete implementation example.