-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathget_directory_contents.py
More file actions
187 lines (163 loc) · 8.27 KB
/
get_directory_contents.py
File metadata and controls
187 lines (163 loc) · 8.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import requests
from collections import namedtuple
import zipfile
import tarfile
import py7zr
import io
import os
import logging
from collections import defaultdict
from source import TOKEN
logger = logging.getLogger(__name__)
FileData = namedtuple('FileData', ['owner', 'repo_name', 'file_name', 'file_content'])
def get_extension(filename: str) -> str:
"""Извлекает расширение из имени файла."""
basename = os.path.basename(filename)
parts = basename.split('.')
return parts[-1].lower() if len(parts) > 1 else 'no_extension'
def count_extensions(results: list[FileData]) -> dict[str, int]:
"""Подсчитывает количество файлов по расширениям."""
counts = defaultdict(int)
for file_data in results:
ext = get_extension(file_data.file_name)
counts[ext] += 1
return dict(counts)
def get_directory_contents(owner: str, repo_name: str, directory_path='', results=None):
if results is None:
results = []
headers = {'Authorization': f'Bearer {TOKEN}'}
url = f'https://api.github.com/repos/{owner}/{repo_name}/contents/{directory_path}'
response = requests.get(url, headers=headers)
if response.status_code == 200:
content_data = response.json()
for content_item in content_data:
if content_item['type'] == 'file':
file_name = content_item['name']
file_content_url = content_item['download_url']
file_response = requests.get(file_content_url)
if file_response.status_code == 200:
file_content = file_response.content
if file_name.endswith('.zip'):
try:
extract_zip(file_content, owner, repo_name, results)
except zipfile.BadZipFile:
logger.error(
f'Файл {file_name} не является корректным zip-архивом'
)
except RuntimeError as e:
logger.error(
f'Ошибка при обработке zip-архива {file_name}: {e}'
)
elif file_name.endswith('.tar.gz') or file_name.endswith('.tgz'):
try:
extract_tar(file_content, owner, repo_name, results)
except tarfile.ReadError:
logger.error(
f'Файл {file_name} не является корректным tar-архивом'
)
elif file_name.endswith('.7z'):
try:
extract_7z(file_content, owner, repo_name, results)
except py7zr.PasswordRequired:
logger.error(
f'Файл {file_name} требует пароль для распаковки'
)
except py7zr.Bad7zFile:
logger.error(
f'Файл {file_name} не является корректным 7z-архивом'
)
except py7zr.UnsupportedCompressionMethodError:
logger.error(
f'Файл {file_name} использует неподдерживаемый метод сжатия'
)
else:
try:
file_data = FileData(
owner, repo_name, file_name, file_response.text
)
results.append(file_data)
except UnicodeDecodeError:
logger.error(f'Ошибка при декодировании файла {file_name}')
except Exception as e:
logger.error(f'Ошибка при обработке файла {file_name}: {e}')
else:
logger.error(
f'Ошибка при получении файла {file_name}: {file_response.status_code}'
)
elif content_item['type'] == 'dir':
dir_name = content_item['name']
get_directory_contents(
owner, repo_name, f'{directory_path}/{dir_name}', results
)
else:
logger.error(
f'Ошибка при получении содержимого репозитория {owner}/{repo_name}: {response.status_code}'
)
logger.debug(count_extensions(results))
return results
def extract_zip(file_content, owner, repo_name, results):
with zipfile.ZipFile(io.BytesIO(file_content)) as zip_ref:
for zip_info in zip_ref.infolist():
if not zip_info.is_dir():
try:
with zip_ref.open(zip_info) as file:
try:
file_data = FileData(
owner,
repo_name,
zip_info.filename,
file.read().decode('utf-8'),
)
results.append(file_data)
except UnicodeDecodeError:
logger.warning(
f'Ошибка при декодировании файла {zip_info.filename}'
)
except Exception as e:
logger.error(
f'Ошибка при обработке файла {zip_info.filename}: {e}'
)
except RuntimeError as e:
logger.error(f'Ошибка при обработке файла {zip_info.filename}: {e}')
def extract_tar(file_content, owner, repo_name, results):
with tarfile.open(fileobj=io.BytesIO(file_content), mode='r:gz') as tar_ref:
for tar_info in tar_ref.getmembers():
if tar_info.isfile():
try:
file = tar_ref.extractfile(tar_info)
if file:
try:
file_data = FileData(
owner,
repo_name,
tar_info.name,
file.read().decode('utf-8'),
)
results.append(file_data)
except UnicodeDecodeError:
logger.warning(
f'Ошибка при декодировании файла {tar_info.name}'
)
except Exception as e:
logger.error(
f'Ошибка при обработке файла {tar_info.name}: {e}'
)
except Exception as e:
logger.error(f'Ошибка при распаковке файла {tar_info.name}: {e}')
def extract_7z(file_content, owner, repo_name, results):
try:
with py7zr.SevenZipFile(io.BytesIO(file_content), mode='r') as archive:
for name, bio in archive.readall().items():
try:
file_data = FileData(
owner, repo_name, name, bio.read().decode('utf-8')
)
results.append(file_data)
except UnicodeDecodeError:
logger.warning(f'Ошибка при декодировании файла {name}')
except Exception as e:
logger.error(f'Ошибка при обработке файла {name}: {e}')
except py7zr.exceptions.UnsupportedCompressionMethodError:
logger.error('Файл использует неподдерживаемый метод сжатия')
except Exception as e:
logger.error(f'Ошибка при обработке 7z-архива: {e}')