forked from ubinux/get_spdx
-
Notifications
You must be signed in to change notification settings - Fork 0
/
generate_spdx
executable file
·339 lines (299 loc) · 12.9 KB
/
generate_spdx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
#!/usr/bin/python3
# encoding: utf-8
import argparse
import os
import time
import subprocess
from argparse import RawTextHelpFormatter
parser = argparse.ArgumentParser(description='Upload the OSS archives to fosslogy server and download spdx files.', formatter_class=RawTextHelpFormatter)
parser.add_argument('-c', '--config', help='$TOEKN@fossology_server\n\
By default:\n\
$TOKEN will be read from environment.\n\
fossology_server is 127.0.0.1')
parser.add_argument('src', help="The OSS archive or directory includes OSS archives that want to be scanned.")
parser.add_argument('-d', '--dst', default=None, help="The directory where spdx files be output. By default, spdx file will output into the current directory.")
parser.add_argument('-f', '--folder', default=None, help="Which directory on fossology server where OSS archives want to be uploaded.")
parser.add_argument('-r', '--reuse', action="store_true", help="Reuse prior results from an earlier scan.")
args = parser.parse_args()
"""Default config"""
#By default, fossology server is 127.0.0.1
url = "127.0.0.1"
#By default, spdx files will be output into `pwd`
dstPath = os.path.abspath('./')
#By default, archive will be uploaded to "Software Repository"folder
folder = "Software Repository"
folder_id = 1
#By default, previous data of fossology will not be used.
reuse = False
delaytime = 30
if args.config is not None:
[token, url] = args.config.split('@',1)
else:
token = os.environ['TOKEN']
if token == None:
raise Exception("Please input token of fossology server.\n")
srcPath = os.path.abspath(args.src)
if os.path.exists(srcPath) != True:
raise Exception("%s is not exist.\n", srcPath)
if args.dst is not None:
dstPath = os.path.abspath(args.dst)
if os.path.exists(dstPath) == False:
os.makedirs(dstPath)
if args.reuse:
reuse = True
"""Check if the folder is exist"""
if args.folder is not None:
folder = args.folder
folder_id = 0
rest_api_cmd = "curl -k -s -S -X GET http://" + url + ":8081/repo/api/v1/folders" \
+ " -H \"Authorization: Bearer " + token + "\"" \
+ " --noproxy 127.0.0.1"
print("Get folder list ")
#print("Invoke rest_api_cmd = " + rest_api_cmd )
try:
has_uploaded = subprocess.check_output(rest_api_cmd, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as e:
raise Exception("Get folder list fail. Please check your fossplogy.\n")
has_uploaded = str(has_uploaded, encoding = "utf-8")
has_uploaded = has_uploaded.replace("null", "None")
print("Get uploaded as following: ")
print(has_uploaded)
has_uploaded = eval(has_uploaded)
if len(has_uploaded) == 0:
raise Exception("Didn't get folder list. Please check your fossology server.\n")
for i in range(0, len(has_uploaded)):
if has_uploaded[i]["name"] == folder:
folder_id = has_uploaded[i]["id"]
print("%s folder has exist.\n" % folder)
break
if folder_id == 0:
print("%s folder is not exist. So, create it.\n" % folder)
rest_api_cmd = "curl -k -s -S -X POST http://" + url + ":8081/repo/api/v1/folders" \
+ " -H \'parentFolder: 1\'" \
+ " -H \'folderName: " + folder + "\'" \
+ " -H \"Authorization: Bearer " + token + "\"" \
+ " --noproxy 127.0.0.1"
print("Create folder.")
#print("Invoke rest_api_cmd = " + rest_api_cmd )
try:
create_folder = subprocess.check_output(rest_api_cmd, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as e:
raise Exception("Create folder fail. Please check your fossplogy.\n")
create_folder = str(create_folder, encoding = "utf-8")
create_folder = eval(create_folder)
#print("create_folder = ")
print(create_folder)
if str(create_folder["code"]) == "201":
folder_id = create_folder["message"]
else:
raise Exception("Create folder fail. Please check your fossology server.\n")
def has_uploaded(tar_file):
"""
Check whether this src has been upload before.
"""
(work_dir, file_name) = os.path.split(tar_file)
rest_api_cmd = "curl -k -s -S -X GET http://" + url + ":8081/repo/api/v1/uploads" \
+ " -H \'folderName: " + folder + "\'" \
+ " -H \"Authorization: Bearer " + token + "\"" \
+ " --noproxy 127.0.0.1"
print("Check if %s has uploaded." % file_name)
#print("Invoke rest_api_cmd = " + rest_api_cmd )
try:
has_uploaded_ret = subprocess.check_output(rest_api_cmd, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as e:
raise Exception("Get uploaded info fail. Please check your fossplogy.\n")
has_uploaded_ret = str(has_uploaded_ret, encoding = "utf-8")
has_uploaded_ret = eval(has_uploaded_ret)
if len(has_uploaded_ret) == 0:
print("No archive has been uploaded.")
return False
for i in range(0, len(has_uploaded_ret)):
if has_uploaded_ret[i]["uploadname"] == file_name:
if str(os.path.getsize(tar_file)) == str(has_uploaded_ret[i]["filesize"]) and str(has_uploaded_ret[i]["folderid"]) == str(folder_id):
print("Find " + file_name + " in fossology server \"Software Repository\" folder. So, reuse previous data.")
return has_uploaded_ret[i]["id"]
print("%s has not been uploaded to %s folder before." % (tar_file, folder))
return False
def upload(tar_file):
i = 0
rest_api_cmd = "curl -k -s -S -X POST http://" + url + ":8081/repo/api/v1/uploads" \
+ " -H \"folderId: " + str(folder_id) + "\"" \
+ " -H \"Authorization: Bearer " + token + "\"" \
+ " -H \'uploadDescription: created by REST\'" \
+ " -H \'public: public\'" \
+ " -H \'Content-Type: multipart/form-data\'" \
+ " -F \'fileInput=@\"" + tar_file + "\";type=application/octet-stream\'" \
+ " --noproxy 127.0.0.1"
print("Upload %s." % tar_file)
#print("Invoke rest_api_cmd = " + rest_api_cmd )
while i < 10:
time.sleep(delaytime)
try:
upload = subprocess.check_output(rest_api_cmd, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as e:
raise Exception("Upload %s fail. Please check your fossplogy.\n" % tar_file)
upload = str(upload, encoding = "utf-8")
print("Upload : ")
print(upload)
upload = eval(upload)
if str(upload["code"]) == "201":
#print("Upload Id = %s " % upload["message"])
return upload["message"]
i += 1
raise Exception("Upload %s fail, please check your fossology server.\n" % tar_file)
return False
def analysis(upload_id):
i = 0
rest_api_cmd = "curl -k -s -S -X POST http://" + url + ":8081/repo/api/v1/jobs" \
+ " -H \"folderId: " + str(folder_id) + "\"" \
+ " -H \"uploadId: " + str(upload_id) + "\"" \
+ " -H \"Authorization: Bearer " + token + "\"" \
+ " -H \'Content-Type: application/json\'" \
+ " --data \'{\"analysis\": {\"bucket\": true,\"copyright_email_author\": true,\"ecc\": true, \"keyword\": true,\"mime\": true,\"monk\": true,\"nomos\": true,\"package\": true},\"decider\": {\"nomos_monk\": true,\"bulk_reused\": true,\"new_scanner\": true}}\'" \
+ " --noproxy 127.0.0.1"
print("Analysis upload_id :%s" % upload_id)
#print("Invoke rest_api_cmd = " + rest_api_cmd )
while i < 10:
try:
time.sleep(delaytime)
analysis = subprocess.check_output(rest_api_cmd, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as e:
raise Exception("Analysis fail!")
time.sleep(delaytime)
analysis = str(analysis, encoding = "utf-8")
print("analysis : ")
print(analysis)
analysis = eval(analysis)
if str(analysis["code"]) == "201":
return analysis["message"]
elif str(analysis["code"]) == "404":
print("analysis is still not complete.")
time.sleep(delaytime)
else:
return False
i += 1
time.sleep(delaytime*i)
print("Analysis is fail, will try again.")
raise Exception("Analysis is fail, please check your fossology server.")
def trigger(upload_id):
i = 0
rest_api_cmd = "curl -k -s -S -X GET http://" + url + ":8081/repo/api/v1/report" \
+ " -H \"Authorization: Bearer " + token + "\"" \
+ " -H \"uploadId: " + str(upload_id) + "\"" \
+ " -H \'reportFormat: spdx2tv\'" \
+ " --noproxy 127.0.0.1"
print("Trigger upload_id :%s" % upload_id)
#print("Invoke rest_api_cmd = " + rest_api_cmd )
while i < 4:
time.sleep(delaytime)
try:
trigger = subprocess.check_output(rest_api_cmd, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as e:
raise Exception("Trigger failed: \n%s" % e.output.decode("utf-8"))
time.sleep(delaytime)
trigger = str(trigger, encoding = "utf-8")
trigger = eval(trigger)
print("trigger result : ")
print(trigger)
if str(trigger["code"]) == "201":
return trigger["message"].split("/")[-1]
i += 1
time.sleep(delaytime*4)
print("Trigger is fail, will try again.")
raise Exception("Trigger is fail, please check your fossology server.")
def get_spdx(report_id, spdx_file):
empty = True
i = 0
rest_api_cmd = "curl -k -s -S -X GET http://" + url + ":8081/repo/api/v1/report/" + report_id \
+ " -H \'accept: text/plain\'" \
+ " -H \"Authorization: Bearer " + token + "\"" \
+ " --noproxy 127.0.0.1"
print("create spdx file for report :%s." % report_id)
#print("Invoke rest_api_cmd = " + rest_api_cmd )
while i < 2:
time.sleep(delaytime)
file = open(spdx_file,'wt')
try:
p = subprocess.Popen(rest_api_cmd, shell=True, universal_newlines=True, stdout=file)
except subprocess.CalledProcessError as e:
raise Exception("get_spdx failed: \n%s" % e.output.decode("utf-8"))
ret_code = p.wait()
file.flush()
time.sleep(delaytime)
file.close()
file = open(spdx_file,'r+')
first_line = file.readline()
if "SPDXVersion" in first_line:
line = file.readline()
while line:
if "LicenseID:" in line:
empty = False
break
line = file.readline()
file.close()
if empty == True:
print("Hasn't get license info.")
return False
else:
return True
else:
print("Get the first line is " + first_line)
print("spdx is not correct, will try again.")
file.close()
os.remove(spdx_file)
i += 1
time.sleep(delaytime*4)
return False
def scanOne(srcArchive):
srcArchive = os.path.basename(srcArchive)
spdx_file = srcArchive + ".spdx"
spdx_file = os.path.join(dstPath, spdx_file)
upload_id = False
i = 0
print("#########################")
print("Begin to create spdx for %s" % srcArchive)
print("#########################\n")
if reuse:
upload_id = has_uploaded(srcArchive)
if upload_id == False:
upload_id = upload(srcArchive)
else:
upload_id = upload(srcArchive)
if analysis(upload_id) == False:
return False
while i < 6:
report_id = trigger(upload_id)
if report_id == False:
return False
if i > 0:
time.sleep(delaytime*i)
spdx2tv = get_spdx(report_id, spdx_file)
if spdx2tv == False:
print("get_spdx of %s is unnormal. Will try again!" % srcArchive)
else:
print("#########################")
print("SUCCESS!")
print("#########################\n")
return True
def scanSrc():
currDir = os.path.abspath(os.curdir)
srcDir = os.path.dirname(srcPath)
srcArchive = os.path.basename(srcPath)
os.chdir(srcDir)
scanOne(srcArchive)
os.chdir(currDir)
def scanDir():
currDir = os.path.abspath(os.curdir)
#print("currDir = %s \n" % currDir)
#print("srcArchiveDir = %s \n" % srcArchiveDir)
os.chdir(srcPath)
list = os.listdir(srcPath)
for i in list:
#print("Begin to scan %s \n" % i)
scanOne(i)
os.chdir(currDir)
"""Begin to scan src"""
if os.path.isdir(srcPath):
scanDir()
else:
scanSrc()