-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgcs_buckets.py
executable file
·98 lines (79 loc) · 3.24 KB
/
gcs_buckets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python3
from oauth2client.client import GoogleCredentials
from asyncio import run, gather, create_task
from aiohttp import ClientSession
import yaml
async def make_async_call(url: str, access_token: str, key: str = 'items') -> list:
results = []
try:
headers = {'Authorization': f"Bearer {access_token}"}
async with ClientSession(raise_for_status=True) as session:
page_token = None
while True:
params = {'pageToken': page_token} if page_token else {}
async with session.get(url=url, headers=headers, params=params) as response:
if int(response.status) == 200:
data = await response.json()
results.extend(data.get(key, []))
if page_token := data.get('nextPageToken'):
params.update({'pageToken': page_token})
else:
break
return results
except Exception as e:
return []
async def get_projects(access_token: str) -> list:
try:
url = "https://cloudresourcemanager.googleapis.com/v1/projects"
projects = await make_async_call(url, access_token, key='projects')
_ = {}
for project in projects:
_.update({int(project['projectNumber']): project['projectId']})
return _
except Exception as e:
quit(e)
async def get_buckets(project_id: str, access_token: str) -> list:
try:
url = f"https://storage.googleapis.com/storage/v1/b?project={project_id}"
return await make_async_call(url, access_token)
except Exception as e:
print(e)
return []
async def get_objects(bucket_name: str, access_token: str):
total_size = 0
params = {'prefix': ''}
try:
url = f"https://storage.googleapis.com/storage/v1/b/{bucket_name}/o?prefix="
_ = await make_async_call(url, access_token)
num_objects = len(_)
for o in _:
if size := int(o.get('size', 0)):
total_size += size
return num_objects, total_size
except Exception as e:
print(e)
return []
async def main():
try:
creds = GoogleCredentials.get_application_default()
access_token = creds.get_access_token().access_token
projects = await get_projects(access_token)
tasks = []
for project_id in ["otc-core-network-prod-4aea"]: #projects.values():
tasks.append(create_task(get_buckets(project_id, access_token)))
buckets = []
for _ in await gather(*tasks):
buckets.extend(_)
for bucket in buckets:
_ = {
'name': bucket.get('name'),
'location': bucket.get('location', 'UNKNOWN').lower(),
'class': bucket.get('storageClass', 'UNKNOWN'),
'project_id': projects.get(int(bucket['projectNumber']), "error"),
}
_['num_objects'], _['total_size'] = await get_objects(_['name'], access_token)
print(_)
except Exception as e:
quit(e)
if __name__ == "__main__":
run(main())