Skip to content

Commit c2b39bd

Browse files
Refactor all gc07
1 parent c5af4e8 commit c2b39bd

File tree

4 files changed

+214
-851
lines changed
  • src/lambda
    • gc07_check_certificate_authorities
    • gc07_check_cryptographic_algorithms
    • gc07_check_encryption_in_transit
    • gc07_check_secure_network_transmission_policy

4 files changed

+214
-851
lines changed

src/lambda/gc07_check_certificate_authorities/app.py

+37-214
Original file line numberDiff line numberDiff line change
@@ -3,176 +3,17 @@
33

44
import json
55
import logging
6-
import time
7-
import re
86

9-
import boto3
10-
import botocore
7+
from utils import is_scheduled_notification, check_required_parameters
8+
from boto_util.client import get_client
9+
from boto_util.config import build_evaluation, submit_evaluations
10+
from boto_util.s3 import check_s3_object_exists, get_lines_from_s3_file
11+
from boto_util.acm import list_all_acm_certificates, describe_acm_certificate
1112

12-
ASSUME_ROLE_MODE = True
13-
ACCOUNT_RESOURCE_TYPE = "AWS::::Account"
1413

15-
16-
def evaluate_parameters(rule_parameters):
17-
"""Evaluate the rule parameters dictionary validity. Raise a Exception for invalid parameters.
18-
Keyword arguments:
19-
rule_parameters -- the Key/Value dictionary of the Config Rule parameters
20-
"""
21-
if "S3CasCurrentlyInUsePath" not in rule_parameters:
22-
logger.error('The parameter with "S3CasCurrentlyInUsePath" as key must be defined.')
23-
raise ValueError('The parameter with "S3CasCurrentlyInUsePath" as key must be defined.')
24-
if not rule_parameters["S3CasCurrentlyInUsePath"]:
25-
logger.error('The parameter "S3CasCurrentlyInUsePath" must have a defined value.')
26-
raise ValueError('The parameter "S3CasCurrentlyInUsePath" must have a defined value.')
27-
return rule_parameters
28-
29-
30-
def build_evaluation(resource_id, compliance_type, event, resource_type, annotation=None):
31-
"""Form an evaluation as a dictionary. Usually suited to report on scheduled rules.
32-
Keyword arguments:
33-
resource_id -- the unique id of the resource to report
34-
compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE
35-
event -- the event variable given in the lambda handler
36-
resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule
37-
annotation -- an annotation to be added to the evaluation (default None)
38-
"""
39-
eval_cc = {}
40-
if annotation:
41-
eval_cc["Annotation"] = annotation
42-
eval_cc["ComplianceResourceType"] = resource_type
43-
eval_cc["ComplianceResourceId"] = resource_id
44-
eval_cc["ComplianceType"] = compliance_type
45-
eval_cc["OrderingTimestamp"] = str(json.loads(event["invokingEvent"])["notificationCreationTime"])
46-
return eval_cc
47-
48-
49-
def get_client(service, event, region="ca-central-1"):
50-
"""Return the service boto client. It should be used instead of directly calling the client.
51-
Keyword arguments:
52-
service -- the service name used for calling the boto.client()
53-
event -- the event variable given in the lambda handler
54-
"""
55-
if not ASSUME_ROLE_MODE or (AWS_ACCOUNT_ID == AUDIT_ACCOUNT_ID):
56-
return boto3.client(service, region_name=region)
57-
execution_role_arn = f"arn:aws:iam::{AWS_ACCOUNT_ID}:role/{EXECUTION_ROLE_NAME}"
58-
credentials = get_assume_role_credentials(execution_role_arn, region)
59-
return boto3.client(
60-
service,
61-
region_name=region,
62-
aws_access_key_id=credentials["AccessKeyId"],
63-
aws_secret_access_key=credentials["SecretAccessKey"],
64-
aws_session_token=credentials["SessionToken"],
65-
)
66-
67-
68-
def get_assume_role_credentials(role_arn, region="ca-central-1"):
69-
"""Return the service boto client. It should be used instead of directly calling the client.
70-
Keyword arguments:
71-
service -- the service name used for calling the boto.client()
72-
event -- the event variable given in the lambda handler
73-
"""
74-
sts_client = boto3.client("sts", region_name=region)
75-
try:
76-
assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution")
77-
return assume_role_response["Credentials"]
78-
except botocore.exceptions.ClientError as ex:
79-
if "AccessDenied" in ex.response["Error"]["Code"]:
80-
ex.response["Error"]["Message"] = "AWS Config does not have permission to assume the IAM role."
81-
else:
82-
ex.response["Error"]["Message"] = "InternalError"
83-
ex.response["Error"]["Code"] = "InternalError"
84-
raise ex
85-
86-
87-
def is_scheduled_notification(message_type):
88-
"""Check whether the message is a ScheduledNotification or not.
89-
Keyword arguments:
90-
message_type -- the message type
91-
"""
92-
return message_type == "ScheduledNotification"
93-
94-
95-
def check_s3_object_exists(aws_s3_client, object_path: str) -> bool:
96-
"""Check if the S3 object exists
97-
Keyword arguments:
98-
object_path -- the S3 object path
99-
"""
100-
# parse the S3 path
101-
match = re.match(r"s3:\/\/([^/]+)\/((?:[^/]*/)*.*)", object_path)
102-
if match:
103-
bucket_name = match.group(1)
104-
key_name = match.group(2)
105-
else:
106-
logger.error("Unable to parse S3 object path %s", object_path)
107-
raise ValueError(f"Unable to parse S3 object path {object_path}")
108-
try:
109-
aws_s3_client.head_object(Bucket=bucket_name, Key=key_name)
110-
except botocore.exceptions.ClientError as err:
111-
if err.response["Error"]["Code"] == "404":
112-
# The object does not exist.
113-
logger.info("Object %s not found in bucket %s", key_name, bucket_name)
114-
return False
115-
elif err.response["Error"]["Code"] == "403":
116-
# AccessDenied
117-
logger.info("Access denied to bucket %s", bucket_name)
118-
return False
119-
else:
120-
# Something else has gone wrong.
121-
logger.error("Error trying to find object %s in bucket %s", key_name, bucket_name)
122-
raise ValueError(f"Error trying to find object {key_name} in bucket {bucket_name}") from err
123-
else:
124-
# The object does exist.
125-
return True
126-
127-
128-
def extract_bucket_name_and_key(object_path: str) -> tuple[str, str]:
129-
match = re.match(r"s3:\/\/([^/]+)\/((?:[^/]*/)*.*)", object_path)
130-
if match:
131-
bucket_name = match.group(1)
132-
key_name = match.group(2)
133-
else:
134-
logger.error("Unable to parse S3 object path %s", object_path)
135-
raise ValueError(f"Unable to parse S3 object path {object_path}")
136-
return bucket_name, key_name
137-
138-
139-
def get_lines_from_s3_file(aws_s3_client, s3_file_path: str) -> list[str]:
140-
bucket, key = extract_bucket_name_and_key(s3_file_path)
141-
response = aws_s3_client.get_object(Bucket=bucket, Key=key)
142-
return response.get("Body").read().decode("utf-8").splitlines()
143-
144-
145-
def acm_list_all_certificates(acm_client, page_size: int = 50, interval_between_calls: float = 0.1) -> list[dict]:
146-
"""
147-
Get a list of all the AWS Certificate Manager Certificates
148-
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/acm/paginator/ListCertificates.html
149-
"""
150-
resources: list[dict] = []
151-
paginator = acm_client.get_paginator("list_certificates")
152-
# Override the default 'keyTypes' so that we get all the types
153-
page_iterator = paginator.paginate(Includes={"keyTypes": []}, PaginationConfig={"PageSize": page_size})
154-
for page in page_iterator:
155-
resources.extend(page.get("CertificateSummaryList", []))
156-
time.sleep(interval_between_calls)
157-
return resources
158-
159-
160-
def acm_describe_certificate(acm_client, certificate_arn: str) -> dict | None:
161-
try:
162-
response: dict | None = acm_client.describe_certificate(CertificateArn=certificate_arn)
163-
return None if not response else response.get("Certificate", None)
164-
except botocore.exceptions.ClientError as ex:
165-
# Scrub error message for any internal account info leaks
166-
if "AccessDenied" in ex.response["Error"]["Code"]:
167-
ex.response["Error"]["Message"] = "AWS Config does not have permission to assume the IAM role."
168-
elif "ResourceNotFound" in ex.response["Error"]["Code"]:
169-
ex.response["Error"]["Message"] = "ResourceNotFound Error calling 'describe_certificate'"
170-
elif "InvalidArn" in ex.response["Error"]["Code"]:
171-
ex.response["Error"]["Message"] = "InvalidArn Error calling 'describe_certificate'"
172-
else:
173-
ex.response["Error"]["Message"] = "InternalError"
174-
ex.response["Error"]["Code"] = "InternalError"
175-
raise ex
14+
# Logging setup
15+
logger = logging.getLogger()
16+
logger.setLevel(logging.INFO)
17617

17718

17819
def assess_certificate_manager_enforcement(
@@ -201,75 +42,57 @@ def assess_certificate_manager_enforcement(
20142
return evaluations, all_resources_are_compliant
20243

20344

204-
def submit_evaluations(
205-
aws_config_client, result_token: str, evaluations: list[dict], interval_between_calls: float = 0.1
206-
):
207-
max_evaluations_per_call = 100
208-
while evaluations:
209-
batch_of_evaluations, evaluations = (
210-
evaluations[:max_evaluations_per_call],
211-
evaluations[max_evaluations_per_call:],
212-
)
213-
aws_config_client.put_evaluations(Evaluations=batch_of_evaluations, ResultToken=result_token)
214-
if evaluations:
215-
time.sleep(interval_between_calls)
216-
217-
21845
def lambda_handler(event, context):
219-
"""Lambda handler to check CloudTrail trails are logging.
220-
Keyword arguments:
221-
event -- the event variable given in the lambda handler
222-
context -- the context variable given in the lambda handler
22346
"""
224-
global logger
225-
logger = logging.getLogger()
226-
logger.setLevel(logging.INFO)
227-
228-
global AWS_ACCOUNT_ID
229-
global AUDIT_ACCOUNT_ID
230-
global EXECUTION_ROLE_NAME
231-
232-
page_size = 100
233-
interval_between_api_calls = 0.1
47+
This function is the main entry point for Lambda.
23448
235-
rule_parameters = json.loads(event.get("ruleParameters", "{}"))
236-
invoking_event = json.loads(event.get("invokingEvent", "{}"))
237-
logger.info("Received event: %s", json.dumps(event, indent=2))
49+
Keyword arguments:
23850
239-
AWS_ACCOUNT_ID = event["accountId"]
240-
logger.info("Assessing account %s", AWS_ACCOUNT_ID)
51+
event -- the event variable given in the lambda handler
24152
242-
valid_rule_parameters = evaluate_parameters(rule_parameters)
243-
EXECUTION_ROLE_NAME = valid_rule_parameters.get("ExecutionRoleName", "AWSA-GCLambdaExecutionRole")
244-
AUDIT_ACCOUNT_ID = valid_rule_parameters.get("AuditAccountID", "")
53+
context -- the context variable given in the lambda handler
54+
"""
55+
logger.info("Received Event: %s", json.dumps(event, indent=2))
24556

57+
invoking_event = json.loads(event["invokingEvent"])
24658
if not is_scheduled_notification(invoking_event["messageType"]):
24759
logger.error("Skipping assessments as this is not a scheduled invocation")
24860
return
24961

250-
aws_config_client = get_client("config", event)
251-
# Not using get_client to get S3 client for the Audit account
252-
aws_s3_client = boto3.client("s3")
253-
aws_acm_client = get_client("acm", event)
254-
25562
file_param_name = "S3CasCurrentlyInUsePath"
256-
cas_currently_in_use_file_path = valid_rule_parameters.get(file_param_name, "")
63+
rule_parameters = check_required_parameters(
64+
json.loads(event.get("ruleParameters", "{}")), ["ExecutionRoleName", file_param_name]
65+
)
66+
execution_role_name = rule_parameters.get("ExecutionRoleName")
67+
audit_account_id = rule_parameters.get("AuditAccountID", "")
68+
aws_account_id = event["accountId"]
69+
is_not_audit_account = aws_account_id != audit_account_id
70+
71+
page_size = 100
72+
interval_between_api_calls = 0.1
73+
74+
aws_config_client = get_client("config", aws_account_id, execution_role_name, is_not_audit_account)
75+
# Get the S3 client for the current (Audit) account where this lambda runs from
76+
aws_s3_client = get_client("s3")
77+
aws_acm_client = get_client("acm", aws_account_id, execution_role_name, is_not_audit_account)
78+
79+
cas_currently_in_use_file_path = rule_parameters.get(file_param_name, "")
25780

25881
if not check_s3_object_exists(aws_s3_client, cas_currently_in_use_file_path):
25982
annotation = (
26083
f"No file found for s3 path '{cas_currently_in_use_file_path}' via '{file_param_name}' input parameter."
26184
)
26285
logger.info(annotation)
263-
evaluations = [build_evaluation(AWS_ACCOUNT_ID, "NON_COMPLIANT", event, ACCOUNT_RESOURCE_TYPE, annotation)]
264-
aws_config_client.put_evaluations(Evaluations=evaluations, ResultToken=event["resultToken"])
86+
evaluations = [build_evaluation(aws_account_id, "NON_COMPLIANT", event, annotation=annotation)]
87+
submit_evaluations(aws_config_client, event["resultToken"], evaluations, interval_between_api_calls)
26588
return
26689

26790
cas_currently_in_use = get_lines_from_s3_file(aws_s3_client, cas_currently_in_use_file_path)
26891
logger.info("cas_currently_in_use from the file in s3: %s", cas_currently_in_use)
26992

270-
certificates_summaries = acm_list_all_certificates(aws_acm_client, page_size, interval_between_api_calls)
93+
certificates_summaries = list_all_acm_certificates(aws_acm_client, page_size, interval_between_api_calls)
27194
certificates_descriptions = [
272-
acm_describe_certificate(aws_acm_client, x.get("CertificateArn", "")) for x in certificates_summaries
95+
describe_acm_certificate(aws_acm_client, x.get("CertificateArn", "")) for x in certificates_summaries
27396
]
27497

27598
evaluations, all_acm_resources_are_compliant = assess_certificate_manager_enforcement(
@@ -284,5 +107,5 @@ def lambda_handler(event, context):
284107
annotation = "Non-compliant resources found in scope."
285108

286109
logger.info(f"{compliance_type}: {annotation}")
287-
evaluations.append(build_evaluation(AWS_ACCOUNT_ID, compliance_type, event, ACCOUNT_RESOURCE_TYPE, annotation))
110+
evaluations.append(build_evaluation(aws_account_id, compliance_type, event, annotation=annotation))
288111
submit_evaluations(aws_config_client, event["resultToken"], evaluations, interval_between_api_calls)

0 commit comments

Comments
 (0)