-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
115 lines (90 loc) · 3.79 KB
/
main.py
File metadata and controls
115 lines (90 loc) · 3.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# fastapi endpoints
import queue
from dramatiq.message import Message
from dramatiq.results import ResultMissing, ResultFailure
from dramatiq.results.backends import RedisBackend
import json
from fastapi import FastAPI,Request
from fastapi.responses import StreamingResponse
import asyncio
import redis.asyncio as aioredis
import redis
from fastapi import Depends
from fastapi import HTTPException
from models import Recipient
redis_client = redis.Redis(host='localhost', port=6379, db=0)
result_backend = RedisBackend(url="redis://localhost:6379")
# from fastapi import Request
from tasks import send_email, process_image
from fastapi.middleware.cors import CORSMiddleware
def login_check(request: Request):
user_id = request.cookies.get("user_id")
if not user_id:
print("Unauthorized access attempt")
raise HTTPException(status_code=401, detail="Unauthorized")
print(f"Authenticated user: {user_id}")
return user_id
app = FastAPI(dependencies=[Depends(login_check)])
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5500"], # React app origin
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
listeners: dict[str, list[asyncio.Queue]] = {}
# ✅ one global forever listener — runs on startup
@app.on_event("startup")
async def start_redis_listener():
asyncio.create_task(redis_listener())
async def redis_listener():
redis_client = aioredis.from_url("redis://localhost:6379")
pubsub = redis_client.pubsub()
await pubsub.psubscribe("client:*") # subscribe to ALL client channels at once
async for message in pubsub.listen():
if message["type"] == "pmessage":
channel = message["channel"].decode() # e.g. "client:abc"
data = message["data"].decode()
client_id = channel.split(":")[1] # extract "abc"
# dump into the right client's queue
if client_id in listeners:
for q in listeners[client_id]:
await q.put(data) # ✅ local fan-out
# SSE endpoint — just reads from local asyncio.Queue
@app.get("/listen/")
async def listen(req: Request):
client_id = req.cookies.get("user_id")
queue = asyncio.Queue()
# listeners[client_id] = queue # register
if client_id not in listeners:
listeners[client_id] = []
listeners[client_id].append(queue)
async def stream():
try:
while True:
try:
data = await asyncio.wait_for(queue.get(), timeout=25)
yield f"data: {data}\n\n"
except asyncio.TimeoutError:
yield ":ping\n\n"
# close SSE when task finishes
finally:
listeners[client_id].remove(queue)
if not listeners[client_id]:
del listeners[client_id]
return StreamingResponse(stream(), media_type="text/event-stream")
@app.get("/task/{task_id}")
def get_task(task_id: str,req: Request):
# here goes the logic to fetch task details
message = Message(message_id=task_id,queue_name="default",actor_name="send_email", args=[], kwargs={},
options={},) # dummy message to fetch result
result = result_backend.get_result(message)
return {"message": "Task details", "task_id": task_id, "result": result}
@app.post("/send-email")
async def send_email_endpoint(recipient:Recipient,req: Request):
# data = await req.json()
# recipient = data.get("recipient")
# subject = data.get("subject")
# body = data.get("body")
message = send_email.send(recipient.email, recipient.subject, recipient.body,requester=req.cookies.get("user_id")) # 👈 send email to queue
return {"message": "Email sent to queue", "recipient": recipient, "message_id": message.message_id}