-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
154 lines (126 loc) · 5.98 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import logging
import time
import backoff
import toml
from telethon import TelegramClient
from telethon.tl.types import MessageMediaDocument
from telethon.errors import SessionPasswordNeededError
from telethon.errors import SessionExpiredError
import os
import asyncio
_logger = logging.getLogger(__name__)
_is_backoff_v2 = next(backoff.expo()) is None
def create_exp_backoff_generator(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen
class TelegramDownloader:
def __init__(self, phone, api_id, api_hash, target_channels, download_dir, filter_extensions, num_threads=3):
self.client = TelegramClient(phone, api_id, api_hash)
self.target_channels = target_channels
self.download_dir = download_dir
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir)
_logger.info(f"Created directory: {self.download_dir}")
self.phone = phone
self.filter_extensions = tuple(filter_extensions)
self.num_threads = num_threads
self.max_retry_time = 64
async def start(self):
await self.client.start(self.phone)
_logger.info("Client Created")
# Ensure authorization
if not await self.client.is_user_authorized():
await self.client.send_code_request(self.phone)
try:
await self.client.sign_in(self.phone, input('Enter the code: '))
except SessionPasswordNeededError:
await self.client.sign_in(password=input('Password: '))
except Exception as e:
_logger.info(f"Failed to authenticate: {e}")
return
await self.process_channels()
async def download_file(self, message):
for delay in create_exp_backoff_generator(self.max_retry_time):
if delay == self.max_retry_time:
_logger.error('Max retries exceeded download file: {}'.format(message))
break
file_name = message.file.name
file_path = os.path.join(self.download_dir, file_name)
try:
# Check if the file exists and compare sizes
if os.path.exists(file_path):
local_file_size = os.path.getsize(file_path)
if local_file_size == message.file.size:
_logger.info(f'File {file_name} already exists and is of the same size. Skipping download.')
break # Skip re-download
else:
_logger.info(f'File {file_name} exists but sizes differ. Removing and re-downloading.')
os.remove(file_path)
def progress_callback(current, total):
print(f'\rDownloading {file_name}: {current}/{total} bytes ({(current / total) * 100:.1f}%)',
end='')
_logger.info(f'Downloading {file_name}')
path = await message.download_media(file=self.download_dir, progress_callback=progress_callback)
_logger.info(f'Downloaded {path}')
break # Break the retry loop on successful download
except Exception as e:
_logger.error(f'Error downloading {file_name}: {e}')
await asyncio.sleep(delay) # Use asyncio.sleep for async delay
async def download_files_from_channel(self, channel):
tasks = set()
async for message in self.client.iter_messages(channel):
if message.media and isinstance(message.media, MessageMediaDocument):
file_name = message.file.name
if file_name and file_name.endswith(self.filter_extensions):
tasks.add(asyncio.create_task(self.download_file(message)))
if len(tasks) >= self.num_threads:
_, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
if tasks:
await asyncio.wait(tasks)
async def process_channels(self):
try:
async for dialog in self.client.iter_dialogs():
if any(target_name.lower() in dialog.name.lower() for target_name in self.target_channels):
_logger.info(f'Found channel: {dialog.name}')
await self.download_files_from_channel(dialog.entity)
except SessionExpiredError:
_logger.error('Session expired. Attempting to re-authenticate.')
await self.start()
_logger.info('Download complete!')
async def main():
# Load configuration
try:
config = toml.load("config/config.toml")
except Exception as e:
_logger.error(f'Failed to load config.toml: {e}')
exit(1)
# validate config
required_keys = ['phone', 'api_id', 'api_hash', 'target_channel_names', 'download_directory', 'filter_extensions', 'num_threads']
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
_logger.error(f'Missing configuration keys: {", ".join(missing_keys)}')
exit(1)
download_directory = config['download_directory']
# Ensure download directory exists
if not os.path.exists(download_directory):
os.makedirs(download_directory)
downloader = TelegramDownloader(config['phone'], config['api_id'], config['api_hash'],
config['target_channel_names'], config['download_directory'],
config['filter_extensions'], config['num_threads'])
await downloader.start()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
fh = logging.FileHandler('app.log')
fh.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
# run main app
asyncio.run(main())