-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
227 lines (183 loc) Β· 8.7 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import asyncio
import logging
import sys
from environs import Env
from pytubefix import YouTube, CaptionQuery
from pytubefix.cli import on_progress
from io import BytesIO
from aiogram import Bot, Dispatcher, html, F, types
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart, Command, StateFilter
from aiogram.types import Message, BufferedInputFile, URLInputFile
from aiogram.utils.chat_action import ChatActionSender
from aiogram.fsm.context import FSMContext
from download import get_video_info
from keyboards import format_kb, admin_kb, admin_confirm_kb, admin_statistic_btn_text, admin_send_btn_text
from db import User, db, migrate_db
from states import SendMessages
env = Env()
env.read_env()
TOKEN = env.str("BOT_TOKEN")
ADMINS = env.list("ADMINS")
dp = Dispatcher()
bot = Bot(token=TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
@dp.message(CommandStart())
async def command_start_handler(message: Message) -> None:
await message.answer(f"Hello, {html.bold(message.from_user.full_name)}!\n\nSend me a link to the YouTube video.")
user, created = User.get_or_create(chat_id=message.chat.id, defaults={'lang': 'en', 'is_active': True})
user.is_active = True
user.save()
if str(message.chat.id) in ADMINS:
await message.answer("You are an admin.", reply_markup=admin_kb)
if created:
for admin in ADMINS:
await bot.send_message(admin, f"New user: {message.from_user.full_name} ({message.chat.id})")
@dp.message(Command("dev"))
async def command_dev_handler(message: Message) -> None:
await message.answer("π¨βπ» Developer: @QuvonchbekDev \n"
"π§ Email: [email protected] \n"
"π Website: https://moorfo.uz \n"
"π± Telegram: @QuvonchbekDev \n"
"π± Instagram: @moorfo.uz")
@dp.message(F.text == admin_statistic_btn_text)
async def admin_statistic_handler(message: Message) -> None:
if str(message.chat.id) not in ADMINS:
return
users = User.select().count()
active_users = User.select().where(User.is_active == True).count()
await message.answer(f"π Statistic\n\n Users: {users} \n Active users: {active_users}")
@dp.message(F.text.startswith("https://youtu"))
async def getlink_handler(message: Message) -> None:
global link
global video_info
link = message.text
video_info = get_video_info(link)
await message.delete()
if video_info is None:
await message.answer("Error: Unable to get video information.")
return
formats = video_info['formats']
caption_text = (f"πΉ {video_info['title']}\n"
f"π€ <a href='{video_info['channel_url']}'>@{video_info['uploader']}</a>\n\n"
f"π₯ Download formats:\n\n")
await message.answer_photo(video_info['thumbnail'], caption=caption_text, reply_markup=format_kb(formats))
@dp.callback_query(F.data == "format_thumbnail")
async def thumbnail_callback_handler(query: types.CallbackQuery) -> None:
await query.message.delete()
await query.message.answer_document(
document=URLInputFile(
url=video_info['thumbnail'],
filename=f"{video_info['title']}.jpg"
),
caption=f"πΉ {video_info['title']}\n\nThumbnail downloaded by @mega_youtube_downloader_bot",
thumbnail=URLInputFile(video_info['thumbnail'])
)
@dp.callback_query(F.data == "format_audio")
async def audio_callback_handler(query: types.CallbackQuery) -> None:
await query.message.delete()
loading = await query.message.answer("β οΈ Video size is large. Please wait for a while. \n\nDownloading...")
try:
video = YouTube(link, on_progress_callback=on_progress)
stream = video.streams.get_audio_only()
audio_buffer = BytesIO()
stream.stream_to_buffer(audio_buffer)
audio_buffer.seek(0)
audio_file = BufferedInputFile(audio_buffer.read(), filename=f"{video.title}.mp3")
await loading.delete()
async with ChatActionSender(bot=bot, chat_id=query.from_user.id, action="upload_audio"):
await bot.send_audio(
chat_id=query.from_user.id, audio=audio_file,
caption=f"π΅ {video.title} \n\nAudio downloaded by @mega_youtube_downloader_bot",
thumbnail=URLInputFile(video_info['thumbnail']),
title=video.title, performer=video_info['uploader']
)
audio_buffer.close()
except Exception as e:
try:
await loading.delete()
except Exception as e_delete:
print(f"Error deleting loading message: {str(e_delete)}")
await query.message.answer(f"β An error occurred: {str(e).replace('<', '<').replace('>', '>')}")
@dp.callback_query(F.data.startswith("format_"))
async def format_callback_handler(query: types.CallbackQuery) -> None:
format_id = query.data.split("_")[1]
await query.message.delete()
loading = await query.message.answer("β οΈ Video size is large. Please wait for a while. \n\nDownloading...")
try:
video = YouTube(link, on_progress_callback=on_progress)
stream = video.streams.get_by_itag(int(format_id))
video_buffer = BytesIO()
stream.stream_to_buffer(video_buffer)
video_buffer.seek(0)
video_file = BufferedInputFile(video_buffer.read(), filename=f"{video.title}.mp4")
await loading.delete()
async with ChatActionSender(bot=bot, chat_id=query.from_user.id, action="upload_video"):
await bot.send_video(
chat_id=query.from_user.id,
video=video_file,
caption=f"πΉ {video.title} \n\nVideo downloaded by @mega_youtube_downloader_bot",
thumbnail=URLInputFile(video_info['thumbnail']),
width=stream.width,
supports_streaming=True
)
video_buffer.close()
except Exception as e:
try:
await loading.delete()
except Exception as e_delete:
print(f"Error deleting loading message: {str(e_delete)}")
await query.message.answer(f"β An error occurred: {str(e).replace('<', '<').replace('>', '>')}")
@dp.message(F.text == admin_send_btn_text, StateFilter(None))
async def admin_send_handler(message: Message, state: FSMContext) -> None:
if str(message.chat.id) not in ADMINS:
return
await message.answer("π€ Send a message to users.")
await state.set_state(SendMessages.message)
@dp.message(SendMessages.message)
async def send_message_handler(message: Message, state: FSMContext) -> None:
if str(message.chat.id) not in ADMINS:
return
await state.update_data(message=message)
await message.answer("β
Confirm the message.", reply_markup=admin_confirm_kb)
await state.set_state(SendMessages.confirm)
@dp.callback_query(F.data.startswith("send_"))
async def send_confirm_handler(query: types.CallbackQuery, state: FSMContext) -> None:
if str(query.from_user.id) not in ADMINS:
return
data = await state.get_data()
message: Message = data.get("message")
if query.data == "send_confirm":
await query.message.answer("π€ Sending message to all users...")
for user in User.select().where(User.is_active == True):
try:
await message.copy_to(chat_id=user.chat_id)
await query.message.answer(f"β
Message sent to {user.chat_id}")
except Exception as e:
User.update(is_active=False).where(User.chat_id == user.chat_id).execute()
await query.message.answer(f"β Error sending message to {user.chat_id}: {str(e)}")
print(f"Error sending message to {user.chat_id}: {str(e)}")
await asyncio.sleep(0.30)
await query.message.delete()
users = User.select().count()
inactive_users = User.select().where(User.is_active == False).count()
active_users = User.select().where(User.is_active == True).count()
await query.message.answer(
f"β
Message sent to all users. \n\nUsers: {users} \nActive users: {active_users} \nInactive users: {inactive_users}")
else:
await query.message.delete()
await state.clear()
async def main() -> None:
db.connect()
db.create_tables([User])
migrate_db()
db.close()
await bot.delete_webhook()
await bot.set_my_commands([
types.BotCommand(command="/start", description="Start the bot"),
types.BotCommand(command="/dev", description="Developer information")
])
await dp.start_polling(bot)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
asyncio.run(main())