Skip to content

Latest commit

ย 

History

History
135 lines (98 loc) ยท 3.6 KB

File metadata and controls

135 lines (98 loc) ยท 3.6 KB

๐Ÿš€ Async Task Queue with SSE & Dramatiq

FastAPI Redis Python

A robust, real-time task queue system leveraging FastAPI, Dramatiq, and Redis. This project demonstrates how to handle long-running tasks asynchronously and provide instant feedback to clients using Server-Sent Events (SSE).


โœจ Features

  • Asynchronous Task Processing: Offload heavy computations to Dramatiq workers.
  • Real-Time Updates: Instant notification to clients via SSE when tasks change state.
  • Robust Queueing: Powered by Redis for reliable task delivery and result storage.
  • CORS Support: Configured for modern web application frontends.
  • User Authentication: Simple cookie-based authentication for scoped task tracking.

๐Ÿ—๏ธ Architecture

graph TD
    A[Client/Frontend] -->|1. POST /send-email| B(FastAPI Server)
    B -->|2. Enqueue Task| C[Redis Queue]
    C -->|3. Pickup Task| D[Dramatiq Worker]
    D -->|4. Process Task| E[SMTP Server/External Service]
    D -->|5. Publish Update| F[Redis Pub/Sub]
    F -->|6. Fan-out| B
    B -->|7. SSE Update| A
Loading
  1. Submission: Client hits /send-email. FastAPI enqueues a task in Redis.
  2. Processing: Dramatiq worker pulls the task, processes it (e.g., sends an email), and updates Redis Pub/Sub.
  3. Notification: FastAPI listens to Pub/Sub and pushes real-time events to the connected client.

๐Ÿ“‚ Project Structure

File Description
main.py FastAPI application, SSE logic, and API endpoints.
tasks.py Dramatiq actors and task processing logic.
models.py Pydantic models for request validation.
index.html Simple frontend demonstration for SSE connection.
.env.example Template for environment variables.

๐Ÿš€ Getting Started

Prerequisites

  • Python 3.8+
  • Redis Server running on localhost:6379

1. Installation

# Clone the repository
git clone <repository-url>
cd task-queue-project

# Install dependencies
pip install -r requirements.txt

2. Configuration

Create a .env file from the example:

cp .env.example .env

Edit .env and provide your credentials:

  • sender_email: Your Gmail address.
  • gmail_password: Your Gmail App Password.
  • redis_backend: redis://localhost:6379/0

3. Running the System

You will need three terminal windows:

Window 1: Redis (if not already running)

redis-server

Window 2: Dramatiq Worker

dramatiq tasks

Window 3: Web Server

uvicorn main:app --port 8000 --reload

๐Ÿ› ๏ธ Usage

Client Demonstration

Open index.html in your browser. It automatically sets a user_id cookie and connects to the SSE endpoint.

Manual Task Submission

Send a request using curl:

curl -X POST http://localhost:8000/send-email \
  -H "Content-Type: application/json" \
  --cookie "user_id=test-user-123" \
  -d '{
    "email": "user@example.com",
    "subject": "Hello from Task Queue!",
    "body": "This is a real-time test."
  }'

๐Ÿ› ๏ธ Requirements

  • fastapi
  • dramatiq
  • redis
  • uvicorn
  • python-dotenv

๐Ÿ“„ License

Distributed under the MIT License. See LICENSE for more information.