Skip to content
Merged
106 changes: 106 additions & 0 deletions app/internal/connection_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import time
from collections import deque

from aiokafka.admin import AIOKafkaAdminClient
from fastapi import WebSocket


Expand All @@ -16,6 +20,19 @@ def __init__(self):
self.active_connections: list[WebSocket] = []
self.nicknames: dict[WebSocket, str] = {} # Store websocket -> nickname mapping

# Metrics tracking
self.message_timestamps = deque(maxlen=1000) # Track last 1000 message timestamps
self.cdc_events = {'create': 0, 'update': 0, 'delete': 0, 'snapshot': 0}
self.cdc_events_24h = deque(maxlen=10000) # Store events with timestamps for 24h tracking
self.total_messages = 0
self.start_time = time.time()
self.kafka_topics_count = 0 # Track Kafka topics count
self.kafka_bootstrap_servers = 'kafka-debezium:9092' # Default Kafka server

# Throughput tracking
self.cdc_event_timestamps = deque(maxlen=1000) # CDC events with timestamps for events/sec
self.bytes_transferred = deque(maxlen=1000) # Bytes with timestamps for bytes/sec tracking

async def connect(self, websocket: WebSocket, client_id: str, nickname: str = None):
await websocket.accept()
self.active_connections.append(websocket)
Expand All @@ -38,6 +55,33 @@ async def disconnect(self, websocket: WebSocket, client_id: str):
await self.broadcast(f'👋 {nickname} left the chat')

async def broadcast(self, message: str):
# Track message for metrics
current_time = time.time()
self.message_timestamps.append(current_time)
self.total_messages += 1

# Track bytes transferred (message size in bytes)
message_bytes = len(message.encode('utf-8'))
self.bytes_transferred.append({'bytes': message_bytes, 'timestamp': current_time})

# Track CDC events
if '[Created]' in message or 'Created' in message:
self.cdc_events['create'] += 1
self.cdc_events_24h.append({'type': 'create', 'timestamp': current_time})
self.cdc_event_timestamps.append(current_time)
elif '[Updated]' in message or 'Updated' in message:
self.cdc_events['update'] += 1
self.cdc_events_24h.append({'type': 'update', 'timestamp': current_time})
self.cdc_event_timestamps.append(current_time)
elif '[Deleted]' in message or 'Deleted' in message:
self.cdc_events['delete'] += 1
self.cdc_events_24h.append({'type': 'delete', 'timestamp': current_time})
self.cdc_event_timestamps.append(current_time)
elif '[Snapshot]' in message or 'Snapshot' in message:
self.cdc_events['snapshot'] += 1
self.cdc_events_24h.append({'type': 'snapshot', 'timestamp': current_time})
self.cdc_event_timestamps.append(current_time)

for connection in self.active_connections:
await connection.send_text(message)

Expand All @@ -48,3 +92,65 @@ def is_nickname_taken(self, nickname: str) -> bool:
def get_nickname(self, websocket: WebSocket) -> str:
"""Get nickname for a websocket connection"""
return self.nicknames.get(websocket, 'Anonymous')

async def fetch_kafka_topics_count(self) -> int:
"""Fetch the current count of Kafka topics"""
try:
admin_client = AIOKafkaAdminClient(bootstrap_servers=self.kafka_bootstrap_servers)
await admin_client.start()
try:
# Get list of all topics
topics = await admin_client.list_topics()
# Filter out internal topics (those starting with __)
user_topics = [topic for topic in topics if not topic.startswith('__')]
self.kafka_topics_count = len(user_topics)
return self.kafka_topics_count
finally:
await admin_client.close()
except Exception as e:
print(f'Error fetching Kafka topics: {e}')
# Return cached value or 0 if never fetched
return self.kafka_topics_count

def get_metrics(self) -> dict:
"""Get current system metrics"""
current_time = time.time()

# Calculate messages per minute (last 60 seconds)
one_minute_ago = current_time - 60
recent_messages = sum(1 for ts in self.message_timestamps if ts >= one_minute_ago)

# Calculate CDC events per second (last 60 seconds)
cdc_events_last_minute = sum(1 for ts in self.cdc_event_timestamps if ts >= one_minute_ago)
cdc_events_per_sec = round(cdc_events_last_minute / 60, 2) if cdc_events_last_minute > 0 else 0

# Calculate bytes per second (last 60 seconds)
bytes_last_minute = sum(item['bytes'] for item in self.bytes_transferred if item['timestamp'] >= one_minute_ago)
bytes_per_sec = round(bytes_last_minute / 60, 2) if bytes_last_minute > 0 else 0

# Calculate events in last 24 hours by type
twenty_four_hours_ago = current_time - (24 * 60 * 60)
events_24h = [e for e in self.cdc_events_24h if e['timestamp'] >= twenty_four_hours_ago]

events_24h_by_type = {
'create': sum(1 for e in events_24h if e['type'] == 'create'),
'update': sum(1 for e in events_24h if e['type'] == 'update'),
'delete': sum(1 for e in events_24h if e['type'] == 'delete'),
'snapshot': sum(1 for e in events_24h if e['type'] == 'snapshot'),
}

# Calculate uptime
uptime_seconds = int(current_time - self.start_time)

return {
'connected_users': len(self.active_connections),
'messages_per_minute': recent_messages,
'total_messages': self.total_messages,
'cdc_events': self.cdc_events,
'events_24h': events_24h_by_type,
'uptime_seconds': uptime_seconds,
'active_nicknames': list(self.nicknames.values()),
'kafka_topics': self.kafka_topics_count,
'cdc_events_per_sec': cdc_events_per_sec,
'bytes_per_sec': bytes_per_sec,
}
10 changes: 10 additions & 0 deletions app/routes/websockets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re

from fastapi import APIRouter, Query, WebSocket
from fastapi.responses import JSONResponse
from starlette.websockets import WebSocketDisconnect

from app.internal.connection_manager import ConnectionManager
Expand Down Expand Up @@ -57,3 +58,12 @@ async def websocket_endpoint(websocket: WebSocket, client_id: str, nickname: str
await manager.broadcast(message)
except WebSocketDisconnect:
await manager.disconnect(websocket, client_id)


@router.get('/metrics')
async def get_metrics():
"""Get real-time system metrics"""
# Fetch Kafka topics count (async call)
await manager.fetch_kafka_topics_count()
metrics = manager.get_metrics()
return JSONResponse(content=metrics)
Loading
Loading