Skip to content

Conversation

@azzi2023
Copy link

No description provided.

azzi2023 and others added 6 commits December 8, 2025 05:55
- Added Redis client and caching service
- Integrated Redis into application startup and shutdown events
- Introduced rate limiting and request logging middlewares
- Updated Docker configuration to include Redis service
- Enhanced error handling and response formatting
- Introduced `docker-compose.override.test-dev.yml` for local development services including backend, frontend, database, and mailcatcher.

- Created `docker-compose.test-dev.yml` for a complete test environment setup with health checks and Redis service.

- Updated main `docker-compose.yml` to use a custom PostgreSQL image with pgvector support.

- Enhanced README with instructions for PostgreSQL 18 and background task management using Celery and Redis.

- Added Cloudflare R2 integration for S3-compatible storage.

- Implemented new Celery worker and task management structure.
- Removed Traefik network and related labels from `docker-compose.override.test-dev.yml`, `docker-compose.override.yml`, and `docker-compose.test-dev.yml` to simplify local development setup.
- Introduced a new `dockercompose-dev.yml` for streamlined development services.
- Added WebSocket infrastructure with a dedicated manager and endpoint for real-time communication using Redis.
- Updated startup and shutdown events in the backend to manage WebSocket connections.
- Created scripts for easier Docker Compose command execution with the new configurations.
Copy link

@coding-sunshine coding-sunshine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: PR #3 - Boilerplate and Helper Setup

Status: APPROVE WITH MANDATORY CHANGES


BLOCKERS (Must Fix Before Merge)

1. In-Memory Rate Limiter - Won't Work in Production

Problem: Uses in-memory storage that won't work with multiple backend instances.

Issues:

  • Each instance tracks rates independently (bypassed by load balancer)
  • Memory grows unbounded between 60s cleanup cycles
  • No persistence across restarts
  • Vulnerable to distributed attacks

Solution: Use Redis-backed rate limiting

class RedisRateLimiter:
    def __init__(self, redis: Redis, requests_per_minute: int = 100):
        self.redis = redis
        self.limit = requests_per_minute
        self.window = 60

    async def is_allowed(self, client_ip: str) -> tuple[bool, dict]:
        key = f"rate_limit:{client_ip}"
        now = datetime.now().timestamp()
        window_start = now - self.window

        pipe = self.redis.pipeline()
        pipe.zremrangebyscore(key, 0, window_start)
        pipe.zadd(key, {str(now): now})
        pipe.zcard(key)
        pipe.expire(key, self.window)

        _, _, count, _ = await pipe.execute()

        remaining = max(0, self.limit - count)
        return count <= self.limit, {"remaining": remaining}

2. Redis Singleton Pattern - Race Conditions

Problem: Singleton pattern has race conditions in async context.

Issues:

  • if cls._instance is None is not atomic
  • Global state breaks testing
  • Violates FastAPI dependency injection patterns

Solution: Use dependency injection

_redis_pool: aioredis.ConnectionPool | None = None

async def get_redis_pool() -> aioredis.ConnectionPool:
    global _redis_pool
    if _redis_pool is None:
        _redis_pool = aioredis.ConnectionPool.from_url(
            settings.REDIS_URL,
            encoding="utf-8",
            decode_responses=True,
            max_connections=50,
            socket_connect_timeout=5,
            health_check_interval=30,
        )
    return _redis_pool

async def get_redis() -> AsyncGenerator[aioredis.Redis, None]:
    pool = await get_redis_pool()
    redis = aioredis.Redis(connection_pool=pool)
    try:
        yield redis
    finally:
        await redis.close()

3. WebSocket Security - No Authentication

Critical Security Issues:

  1. No authentication - anyone can connect
  2. No authorization - no room access control
  3. No rate limiting - DoS vulnerable
  4. No message validation
  5. XSS vulnerability if messages rendered as HTML

Solution: Add JWT authentication

@router.websocket("/ws/{room}")
async def websocket_endpoint(
    websocket: WebSocket,
    room: str,
    token: str = None  # From query: ?token=xxx
):
    # 1. Authenticate
    if not token:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return

    user = await verify_websocket_token(token)
    if not user:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return

    # 2. Authorize room access
    if not await verify_room_access(room, user):
        await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA)
        return

    # 3. Connect with user context
    manager = websocket.app.state.ws_manager
    await manager.connect(websocket, room, user_id=user.id)

    # 4. Rate limiting
    MAX_MESSAGE_SIZE = 64 * 1024
    MAX_MESSAGES_PER_MINUTE = 60

    try:
        while True:
            data = await websocket.receive_text()

            if len(data) > MAX_MESSAGE_SIZE:
                continue

            # Add user context to message
            message = json.loads(data)
            message["user_id"] = user.id
            message["timestamp"] = datetime.utcnow().isoformat()

            await manager.publish(room, json.dumps(message))
    except WebSocketDisconnect:
        logger.info(f"WS disconnected: {user.id}")
    finally:
        await manager.disconnect(websocket, room)

4. Missing Test Coverage

38 files changed, 0 test files added

Required Tests:

tests/
├── integration/
│   ├── test_redis_client.py
│   ├── test_websocket_manager.py
│   └── test_celery_tasks.py
├── unit/
│   ├── test_rate_limiter.py
│   ├── test_middlewares.py
│   └── test_cache_service.py
└── e2e/
    └── test_websocket_flow.py

Minimum Coverage: 60%


5. Missing Dependencies

Problem: Imports reference non-existent files in backend/app/middlewares/error_handler.py

Solution: Create app/core/exceptions.py

from typing import Any

class AppException(Exception):
    def __init__(self, message: str, status_code: int = 500, details: Any = None):
        self.message = message
        self.status_code = status_code
        self.details = details
        super().__init__(self.message)

class NotFoundException(AppException):
    def __init__(self, message: str = "Not found", details: Any = None):
        super().__init__(message, status_code=404, details=details)

class UnauthorizedException(AppException):
    def __init__(self, message: str = "Unauthorized", details: Any = None):
        super().__init__(message, status_code=401, details=details)

class ForbiddenException(AppException):
    def __init__(self, message: str = "Forbidden", details: Any = None):
        super().__init__(message, status_code=403, details=details)

✅ Strengths

Infrastructure Choices

  • ✅ PostgreSQL 18 + pgvector (perfect for AI/vector search)
  • ✅ Redis for caching and pub/sub
  • ✅ Celery for background tasks
  • ✅ WebSockets with Redis pub/sub (enables horizontal scaling)
  • ✅ R2 S3-compatible storage

Architecture

  • ✅ Scalable WebSocket design (Redis pub/sub across instances)
  • ✅ Proper configuration management
  • ✅ Good separation of concerns

📋 Merge Checklist

  • Remove .DS_Store file and add it to .gitignore
  • Fix rate limiter (use Redis)
  • Fix Redis client (use dependency injection)
  • Add WebSocket authentication
  • Create app/core/exceptions.py
  • Add test coverage (minimum 60%)
  • Fix error handling (no silent swallowing)
  • Add dynamic thread pool sizing
  • Tighten dependency version constraints
  • Register error handlers in main.py
  • Document rollback strategy

Review completed by: Claude Code (CTO Analysis)
Contact: For questions about this review, consult the inline comments above.

class RateLimiterMiddleware(BaseHTTPMiddleware):
def __init__(self, app, requests_per_minute: int = 100):
super().__init__(app)
self.requests_per_minute = requests_per_minute

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 BLOCKER: In-memory rate limiting won't work in production with multiple instances.

Each backend instance will have its own self.requests dictionary, so requests can bypass rate limits by hitting different instances through the load balancer.

Required Fix: Use Redis for distributed rate limiting (see main review for solution).


@classmethod
async def get_client(cls) -> aioredis.Redis:
if cls._instance is None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 BLOCKER: Singleton pattern has race conditions in async context.

The check if cls._instance is None is not atomic - multiple coroutines could pass this check simultaneously, creating multiple Redis instances.

Issues:

  • Global state makes testing difficult
  • Violates FastAPI's dependency injection pattern
  • No connection pool visibility

Required Fix: Use dependency injection pattern with get_redis() generator (see main review).



@router.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 BLOCKER: Critical security vulnerability - No authentication on WebSocket endpoint.

Security Issues:

  1. ❌ Anyone can connect without authentication
  2. ❌ No authorization - users can join any room
  3. ❌ No rate limiting - vulnerable to DoS attacks
  4. ❌ No message validation or size limits
  5. ❌ XSS vulnerability if messages are rendered as HTML

Required Fix:

  • Add JWT token authentication via query parameter
  • Implement room access authorization
  • Add rate limiting (max messages per minute)
  • Validate message size and content
  • Add user context to all messages

See main review for complete implementation example.

from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
from app.core.exceptions import AppException

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 BLOCKER: Import references non-existent file.

The file app/core/exceptions.py doesn't exist yet, which will cause import errors when this middleware is loaded.

Required Fix: Create backend/app/core/exceptions.py with AppException, NotFoundException, UnauthorizedException, and ForbiddenException classes.

See main review for complete implementation.

if isinstance(data, (bytes, bytearray)):
data = data.decode()
# channel format: ws:<room>
try:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ CRITICAL: Silent error swallowing makes debugging impossible.

Using bare except Exception: continue hides ALL errors, including:

  • Invalid channel format
  • Deserialization errors
  • Network failures
  • Logic bugs

Fix: Add specific exception handling with logging:

try:
    room = str(channel).split("ws:", 1)[1]
except IndexError:
    logger.error(f"Invalid channel format: {channel}", extra={"message": message})
    continue
except Exception as e:
    logger.exception(f"Unexpected error: {e}", extra={"message": message})
    # Send to error tracking (Sentry, etc.)
    continue

self._pubsub = None
self._listen_task: asyncio.Task | None = None

async def start(self) -> None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent architectural choice! Using Redis pub/sub for WebSocket communication enables horizontal scaling.

This design allows multiple backend instances to share WebSocket messages, which is critical for production deployments behind load balancers.

Benefits:

  • ✅ Multiple app instances can sync in real-time
  • ✅ WebSocket connections can be distributed across servers
  • ✅ Room-based messaging with pattern subscriptions (ws:*)
  • ✅ Proper separation between local connections and distributed messaging

Note: Just needs authentication layer (see security comments on ws.py).

def __init__(self, redis_client: aioredis.Redis):
self.redis = redis_client

async def get(self, key: str) -> Optional[dict]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ CRITICAL: Silent error swallowing - can't distinguish "key missing" from "Redis down".

All errors return None, making it impossible to tell if:

  • The key doesn't exist (normal)
  • Redis connection failed (critical infrastructure failure)
  • Data is corrupted (needs cleanup)

Fix: Handle errors specifically:

async def get(self, key: str) -> Optional[dict]:
    try:
        value = await self.redis.get(key)
        return json.loads(value) if value else None
    except (json.JSONDecodeError, TypeError) as e:
        logger.warning(f"Invalid JSON in cache key {key}: {e}")
        await self.redis.delete(key)  # Clean corrupted data
        return None
    except (redis.exceptions.ConnectionError,
            redis.exceptions.TimeoutError) as e:
        logger.error(f"Redis connection error: {e}", exc_info=True)
        raise  # Let caller handle infrastructure failures

@sonarqubecloud
Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
1 Security Hotspot
C Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

@coding-sunshine coding-sunshine merged commit e4ecf33 into OrganyzAI:master Dec 19, 2025
1 check failed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants