Skip to content

version 1.1 #286

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 27, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 80 additions & 48 deletions examples/client/refresh_project_copyrights.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Ian Ashworth, May 2025
#
import http.client
import signal
from sys import api_version
import sys
import csv
Expand All @@ -18,11 +19,18 @@

http.client._MAXHEADERS = 1000

job_status = 0

logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] {%(module)s:%(lineno)d} %(levelname)s - %(message)s"
)

# initialise
all_my_comp_data = []
my_statistics = {}


def RepDebug(level, msg):
if hasattr(args, 'debug') and level <= args.debug:
print("dbg{" + str(level) + "} " + msg)
Expand All @@ -33,6 +41,59 @@ def RepWarning(msg):
print("WARNING: " + msg)
return True

def CompleteTask(job_status):
now = datetime.datetime.now()
my_statistics['_jobStatus'] = job_status

print('Finished: %s' % now.strftime("%Y-%m-%d %H:%M:%S"))
print('Summary:')
pprint(my_statistics)

# if dumping data
if args.dump_data:
# if outputting to a CSV file
if args.csv_file:
'''Note: See the BD API doc and in particular .../api-doc/public.html#_bom_vulnerability_endpoints
for a complete list of the fields available. The below code shows a subset of them just to
illustrate how to write out the data into a CSV format.
'''
logging.info(f"Exporting {len(all_my_comp_data)} records to CSV file {args.csv_file}")

with open(args.csv_file, 'w') as csv_f:
field_names = [
'Component',
'Component Version',
'Status',
'Url'
]

writer = csv.DictWriter(csv_f, fieldnames=field_names)
writer.writeheader()

for my_comp_data in all_my_comp_data:
row_data = {
'Component': my_comp_data['componentName'],
'Component Version': my_comp_data['componentVersion'],
'Status': my_comp_data['status'],
'Url': my_comp_data['url']
}
writer.writerow(row_data)
else:
# print to screen
pprint(all_my_comp_data)

def SignalHandler(sig, frame):
# Complete the work
print("Ctrl+C detected!")

# tidy up and complete the job
CompleteTask(1)
sys.exit(job_status)

# ------------------------------------------------------------------------------
# register the signal handler
signal.signal(signal.SIGINT, SignalHandler)


# Parse command line arguments
parser = argparse.ArgumentParser("Refresh copyrights for project/version components")
Expand Down Expand Up @@ -74,9 +135,8 @@ def RepWarning(msg):
retries=args.retries,
)

# initialise
all_my_comp_data = []
my_statistics = {}

str_unknown = "n/a"

str_unknown = "n/a"

Expand Down Expand Up @@ -121,10 +181,13 @@ def RepWarning(msg):
my_statistics['_cntProjects'] = 0
my_statistics['_cntVersions'] = 0
my_statistics['_cntComponents'] = 0
my_statistics['_cntOrigins'] = 0

my_statistics['_cntRefresh'] = 0
my_statistics['_cntNoOrigins'] = 0
my_statistics['_cntNoIDs'] = 0
my_statistics['_cntSkippedProjects'] = 0
my_statistics['_jobStatus'] = 0

# record any control values
if args.project_name:
Expand Down Expand Up @@ -156,17 +219,18 @@ def RepWarning(msg):
projects = bd.get_resource('projects')


cnt_projects = 0
cnt_project = 0
cnt_call = 0

# loop through projects list
for this_project in projects:

cnt_projects += 1
cnt_project += 1

# check if we are skipping over this project
if args.skip_projects and cnt_projects <= args.skip_projects:
if args.skip_projects and cnt_project <= args.skip_projects:
my_statistics['_cntSkippedProjects'] += 1
RepDebug(1, 'Skipping project [%d] [%s]' % (cnt_projects, this_project['name']))
RepDebug(1, 'Skipping project [%d] [%s]' % (cnt_project, this_project['name']))
continue

# check if we have hit any limit
Expand All @@ -180,7 +244,7 @@ def RepWarning(msg):

# process this project
my_statistics['_cntProjects'] += 1
RepDebug(1, '## Project: [%d] [%s]' % (cnt_projects, this_project['name']))
RepDebug(1, '## Project: [%d] [%s]' % (cnt_project, this_project['name']))

if args.version_name:
# note the specific project version of interest
Expand Down Expand Up @@ -220,7 +284,6 @@ def RepWarning(msg):
for this_comp_data in bd.get_resource('components', this_version, **comp_kwargs):

if args.max_components and my_statistics['_cntComponents'] >= args.max_components:
RepDebug(1, 'Reached component limit [%d]' % args.max_components)
break

my_statistics['_cntComponents'] += 1
Expand Down Expand Up @@ -255,6 +318,8 @@ def RepWarning(msg):
for this_origin in this_comp_data['origins']:

n_origin += 1
my_statistics['_cntOrigins'] += 1

if this_origin.get('externalId'):
origin_id = this_origin['externalId']
else:
Expand All @@ -266,10 +331,14 @@ def RepWarning(msg):
url += "/copyrights-refresh"

status = -1
cnt_call += 1
call_id = "{}.{}".format(cnt_project, cnt_call)

if args.dry_run != 0:
RepDebug(2, "DryRun: origin - no [%d] id [%s] url [%s]" % (n_origin, origin_id, url))
RepDebug(2, ' DryRun: %s - origin - no [%d] id [%s] url [%s]' % (call_id, n_origin, origin_id, url))
else:
RepDebug(3,
' Origin: %s - origin - no [%d] id [%s] url [%s]' % (call_id, n_origin, origin_id, url))
try:
response = bd.session.put(url, data=None, **refresh_kwargs)
RepDebug(5,'Refresh response: origin [%s] [%s]' % (this_origin, response))
Expand Down Expand Up @@ -319,42 +388,5 @@ def RepWarning(msg):

# end of processing loop

now = datetime.datetime.now()
print('Finished: %s' % now.strftime("%Y-%m-%d %H:%M:%S"))
print('Summary:')
pprint(my_statistics)

# if dumping data
if args.dump_data:
# if outputting to a CSV file
if args.csv_file:
'''Note: See the BD API doc and in particular .../api-doc/public.html#_bom_vulnerability_endpoints
for a complete list of the fields available. The below code shows a subset of them just to
illustrate how to write out the data into a CSV format.
'''
logging.info(f"Exporting {len(all_my_comp_data)} records to CSV file {args.csv_file}")

with open(args.csv_file, 'w') as csv_f:
field_names = [
'Component',
'Component Version',
'Status',
'Url'
]

writer = csv.DictWriter(csv_f, fieldnames=field_names)
writer.writeheader()

for my_comp_data in all_my_comp_data:
row_data = {
'Component': my_comp_data['componentName'],
'Component Version': my_comp_data['componentVersion'],
'Status': my_comp_data['status'],
'Url': my_comp_data['url']
}
writer.writerow(row_data)
else:
# print to screen
pprint(all_my_comp_data)

CompleteTask(0)
#end
Loading