-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathmain.py
More file actions
122 lines (90 loc) · 3.46 KB
/
main.py
File metadata and controls
122 lines (90 loc) · 3.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import os
import html
import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import Response
from lib.tracker import track_stocks
from lib.agent import handle_incoming_message, run_research_pipeline
import asyncio
from apscheduler.schedulers.background import BackgroundScheduler
from lib.stock_checker import get_stock_price
from lib.sms import send_sms
import sys
import json
from twilio.request_validator import RequestValidator
from dotenv import load_dotenv
load_dotenv()
# FASTAPI Configuration
app = FastAPI()
TWILIO_AUTH_TOKEN = os.getenv("TWILIO_AUTH_TOKEN") # Set this in your environment
TARGET_PHONE_NUMBER = os.getenv("TARGET_PHONE_NUMBER") # Set this in your environment
WEBHOOK_URL = os.getenv("WEBHOOK_URL") # This should be your public Twilio webhook URL
@app.post("/receive-message")
async def receive_message(request: Request):
signature = request.headers.get("X-Twilio-Signature", "")
form = await request.form()
validator = RequestValidator(TWILIO_AUTH_TOKEN)
params = dict(form)
# Use WEBHOOK_URL for validation, not request.url
if not validator.validate(WEBHOOK_URL, params, signature):
return Response(content="Invalid signature", status_code=403)
from_number = form.get("From", "")
to_number = form.get("To", "")
body = form.get("Body", "")
if (to_number != os.getenv("TWILIO_PHONE_NUMBER")):
return Response(content="Unauthorized", status_code=403)
if (from_number != TARGET_PHONE_NUMBER):
return Response(content="Unauthorized", status_code=403)
# Get the body content and escape it
safe_body = html.escape(body)
Response(content="OK", status_code=200)
# Process the message
response_text = await handle_incoming_message(safe_body)
# Send the response back to the user
print(f"Response: {response_text}")
send_sms(response_text)
return
# For testing purposes only
async def chat_terminal():
print("Chat mode activated. Type 'exit' to quit.")
while True:
user_input = input("You: ")
if user_input.lower() == "exit":
print("Exiting chat.")
break
response = await handle_incoming_message(user_input)
print(f"Bot: {response}")
# Run the project
if __name__ == "__main__":
# Check to make sure the alert_history.json and tracker_list.json exist, otherwise create them
if not os.path.exists("resources"):
os.makedirs("resources")
if not os.path.exists("resources/alert_history.json"):
with open("resources/alert_history.json", "w") as f:
json.dump({}, f)
if not os.path.exists("resources/tracker_list.json"):
with open("resources/tracker_list.json", "w") as f:
json.dump([], f)
if "-test" in sys.argv:
if "-research" in sys.argv:
stock_symbol = sys.argv[sys.argv.index("-research") + 1]
stock_price = get_stock_price(stock_symbol)
asyncio.run(run_research_pipeline(stock_symbol, stock_price.current_price, stock_price.previous_close))
else:
# CRON Job for tracking stock prices
scheduler = BackgroundScheduler()
scheduler.add_job(track_stocks, 'interval', minutes=1)
scheduler.start()
asyncio.run(chat_terminal())
else:
# CRON Job for tracking stock prices
scheduler = BackgroundScheduler()
scheduler.add_job(track_stocks, 'interval', hours=1)
scheduler.start()
# Run with: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
uvicorn.run(
"main:app",
host=os.getenv("HOST", "0.0.0.0"),
port=int(os.getenv("PORT", "8000")),
reload=True,
)