A robust, real-time task queue system leveraging FastAPI, Dramatiq, and Redis. This project demonstrates how to handle long-running tasks asynchronously and provide instant feedback to clients using Server-Sent Events (SSE).
- Asynchronous Task Processing: Offload heavy computations to Dramatiq workers.
- Real-Time Updates: Instant notification to clients via SSE when tasks change state.
- Robust Queueing: Powered by Redis for reliable task delivery and result storage.
- CORS Support: Configured for modern web application frontends.
- User Authentication: Simple cookie-based authentication for scoped task tracking.
graph TD
A[Client/Frontend] -->|1. POST /send-email| B(FastAPI Server)
B -->|2. Enqueue Task| C[Redis Queue]
C -->|3. Pickup Task| D[Dramatiq Worker]
D -->|4. Process Task| E[SMTP Server/External Service]
D -->|5. Publish Update| F[Redis Pub/Sub]
F -->|6. Fan-out| B
B -->|7. SSE Update| A
- Submission: Client hits
/send-email. FastAPI enqueues a task in Redis. - Processing: Dramatiq worker pulls the task, processes it (e.g., sends an email), and updates Redis Pub/Sub.
- Notification: FastAPI listens to Pub/Sub and pushes real-time events to the connected client.
| File | Description |
|---|---|
main.py |
FastAPI application, SSE logic, and API endpoints. |
tasks.py |
Dramatiq actors and task processing logic. |
models.py |
Pydantic models for request validation. |
index.html |
Simple frontend demonstration for SSE connection. |
.env.example |
Template for environment variables. |
- Python 3.8+
- Redis Server running on
localhost:6379
# Clone the repository
git clone <repository-url>
cd task-queue-project
# Install dependencies
pip install -r requirements.txtCreate a .env file from the example:
cp .env.example .envEdit .env and provide your credentials:
sender_email: Your Gmail address.gmail_password: Your Gmail App Password.redis_backend:redis://localhost:6379/0
You will need three terminal windows:
Window 1: Redis (if not already running)
redis-serverWindow 2: Dramatiq Worker
dramatiq tasksWindow 3: Web Server
uvicorn main:app --port 8000 --reloadOpen index.html in your browser. It automatically sets a user_id cookie and connects to the SSE endpoint.
Send a request using curl:
curl -X POST http://localhost:8000/send-email \
-H "Content-Type: application/json" \
--cookie "user_id=test-user-123" \
-d '{
"email": "user@example.com",
"subject": "Hello from Task Queue!",
"body": "This is a real-time test."
}'fastapidramatiqredisuvicornpython-dotenv
Distributed under the MIT License. See LICENSE for more information.