-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
99 lines (80 loc) · 2.74 KB
/
utils.py
File metadata and controls
99 lines (80 loc) · 2.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import os
import base64
import requests
from email import message_from_bytes
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from transformers import pipeline
from dotenv import load_dotenv
load_dotenv()
classifier = pipeline("text-classification", model="./email_model")
label_map = {
"LABEL_0": "NORMAL",
"LABEL_1": "SPAM",
"LABEL_2": "FRAUD"
}
def refresh_access_token():
refresh_token = os.getenv("GOOGLE_REFRESH_TOKEN")
client_id = os.getenv("GOOGLE_CLIENT_ID")
client_secret = os.getenv("GOOGLE_CLIENT_SECRET")
token_url = "https://oauth2.googleapis.com/token"
data = {
"client_id": client_id,
"client_secret": client_secret,
"refresh_token": refresh_token,
"grant_type": "refresh_token"
}
response = requests.post(token_url, data=data)
if response.status_code == 200:
return response.json()["access_token"]
else:
raise Exception(f"Failed to refresh access token: {response.text}")
def get_last_ten_emails():
access_token = refresh_access_token()
headers = {"Authorization": f"Bearer {access_token}"}
messages_resp = requests.get(
"https://gmail.googleapis.com/gmail/v1/users/me/messages",
headers=headers,
params={"maxResults": 10, "q": "newer_than:7d"}
)
emails = []
if messages_resp.status_code != 200:
raise Exception("Failed to fetch messages")
for msg in messages_resp.json().get("messages", []):
msg_id = msg["id"]
msg_data = requests.get(
f"https://gmail.googleapis.com/gmail/v1/users/me/messages/{msg_id}",
headers=headers
).json()
payload = msg_data.get("payload", {})
headers_list = payload.get("headers", [])
def find_header(name):
for h in headers_list:
if h["name"].lower() == name.lower():
return h["value"]
return None
subject = find_header("Subject")
sender = find_header("From")
date = find_header("Date")
snippet = msg_data.get("snippet", "")
emails.append({
"message_id": msg_id,
"date": date,
"from": sender,
"subject": subject,
"snippet": snippet
})
return emails
def classify_emails(email_data):
results = []
for email in email_data:
text = f"{email['subject']} {email['snippet']}"
pred = classifier(text, truncation=True)[0]
label = label_map.get(pred['label'], pred['label'])
confidence = round(pred['score'] * 100, 2)
results.append({
**email,
"label": label,
"confidence": confidence
})
return results