-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsupabase_client.py
More file actions
184 lines (140 loc) · 6.38 KB
/
Copy pathsupabase_client.py
File metadata and controls
184 lines (140 loc) · 6.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""Простой клиент для Supabase"""
import os
from dotenv import load_dotenv
import httpx
load_dotenv()
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_ANON_KEY")
HEADERS = {
"apikey": SUPABASE_KEY,
"Authorization": f"Bearer {SUPABASE_KEY}",
"Content-Type": "application/json",
}
async def _request(method: str, table: str, data: dict = None, filters: dict = None):
"""Базовый запрос к Supabase"""
url = f"{SUPABASE_URL}/rest/v1/{table}"
# Добавляем фильтры в URL
if filters:
filter_parts = []
for key, value in filters.items():
filter_parts.append(f"{key}=eq.{value}")
url += "?" + "&".join(filter_parts)
async with httpx.AsyncClient(timeout=10.0) as client:
if method == "GET":
response = await client.get(url, headers=HEADERS)
elif method == "POST":
response = await client.post(url, json=data, headers=HEADERS)
elif method == "PATCH":
response = await client.patch(url, json=data, headers=HEADERS)
elif method == "DELETE":
response = await client.delete(url, headers=HEADERS)
else:
raise ValueError(f"Unknown method: {method}")
if response.status_code in [200, 201, 204]:
# Пустой ответ возвращаем как пустой dict
if not response.content:
return {}
return response.json()
# Ошибка - логируем
print(f"Supabase error: {response.status_code} - {response.text}")
return None
# ==================== COUNTRIES ====================
async def get_countries() -> list:
"""Получить все страны"""
result = await _request("GET", "countries")
if result:
# Сортируем по name, берем только активные
return [c for c in result if c.get("is_active", True)]
return []
async def get_country_by_id(country_id: int) -> dict:
"""Получить страну по ID"""
result = await _request("GET", "countries", filters={"id": country_id})
if result and len(result) > 0:
return result[0]
return None
# ==================== LEADS ====================
async def create_lead(
telegram_id: int,
name: str,
phone: str,
country_id: int = None,
message: str = None,
) -> dict:
"""Создать заявку и вернуть созданную запись с ID"""
data = {
"telegram_id": telegram_id,
"name": name,
"phone": phone,
}
if country_id:
data["country_id"] = country_id
if message:
data["message"] = message
# Создаём
await _request("POST", "leads", data=data)
# Сразу получаем созданную заявку (сортируем по created_at DESC)
result = await _request("GET", "leads", filters={"telegram_id": telegram_id})
if result and len(result) > 0:
# Сортируем по created_at и берём последнюю
sorted_leads = sorted(result, key=lambda x: x.get("created_at", ""), reverse=True)
return sorted_leads[0]
return {}
async def get_leads(limit: int = 50) -> list:
"""Получить заявки (для админа)"""
result = await _request("GET", "leads")
if result:
return result[:limit]
return []
# ==================== SETTINGS ====================
async def get_settings() -> dict:
"""Получить настройки бота"""
result = await _request("GET", "settings", filters={"id": 1})
if result and len(result) > 0:
return result[0]
return {}
async def update_settings(data: dict) -> dict:
"""Обновить настройки"""
return await _request("PATCH", "settings", data=data, filters={"id": 1})
# ==================== TOURS (опционально) ====================
async def get_tours(country_id: int = None, limit: int = 20) -> list:
"""Получить туры"""
filters = {"is_active": True}
if country_id:
filters["country_id"] = country_id
result = await _request("GET", "tours", filters=filters)
if result:
return result[:limit]
return []
async def get_tour(tour_id: int) -> dict:
"""Получить тур по ID"""
result = await _request("GET", "tours", filters={"id": tour_id})
if result and len(result) > 0:
return result[0]
return None
# ==================== SETTINGS (AI PROMPT) ====================
async def get_ai_prompt() -> str:
"""Получить AI промпт из настроек"""
result = await _request("GET", "settings", filters={"id": 1})
if result and len(result) > 0:
return result[0].get("ai_prompt", """Ты — ассистент туристического агентства. Отвечай на вопросы о странах, визах, погоде. Отвечай кратко (2-4 предложения), по делу, на русском.""")
return """Ты — ассистент туристического агентства. Отвечай на вопросы о странах, визах, погоде. Отвечай кратко (2-4 предложения), по делу, на русском."""
async def update_ai_prompt(new_prompt: str) -> bool:
"""Обновить AI промпт"""
result = await _request("PATCH", "settings", data={"ai_prompt": new_prompt}, filters={"id": 1})
return result is not None
async def add_clarification(lead_id: int, clarification: str) -> bool:
"""Добавить уточнение к заявке"""
result = await _request("PATCH", "leads", data={"clarification": clarification}, filters={"id": lead_id})
return result is not None
async def get_lead_with_details(lead_id: int) -> dict:
"""Получить заявку с деталями"""
result = await _request("GET", "leads", filters={"id": lead_id})
if result and len(result) > 0:
return result[0]
return None
async def get_welcome_message() -> str:
"""Получить приветственное сообщение"""
result = await _request("GET", "settings", filters={"id": 1})
if result and len(result) > 0:
return result[0].get("welcome_message", "👋 Добро пожаловать!")
return "👋 Добро пожаловать!"