Skip to content

Commit

Permalink
Update GC02CheckAccountManagementPlanLambda
Browse files Browse the repository at this point in the history
  • Loading branch information
david-dearden-ssc committed Dec 11, 2024
1 parent 88bb34d commit 4959a38
Showing 1 changed file with 31 additions and 74 deletions.
105 changes: 31 additions & 74 deletions src/lambda/gc02_check_account_mgmt_plan/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,99 +5,56 @@
import json
import logging

from utils import is_scheduled_notification, check_required_parameters
from boto_util.client import get_client
from boto_util.config import is_scheduled_notification, build_evaluation
from boto_util.config import build_evaluation, submit_evaluations
from boto_util.s3 import check_s3_object_exists

# Logging setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Set to True to get the lambda to assume the Role attached on the Config Service
ASSUME_ROLE_MODE = True
DEFAULT_RESOURCE_TYPE = "AWS::::Account"


def evaluate_parameters(rule_parameters):
"""Evaluate the rule parameters dictionary validity. Raise a ValueError for invalid parameters.
Keyword arguments:
rule_parameters -- the Key/Value dictionary of the Config Rules parameters
"""
if "s3ObjectPath" not in rule_parameters:
logger.error('The parameter with "s3ObjectPath" as key must be defined.')
raise ValueError('The parameter with "s3ObjectPath" as key must be defined.')
if not rule_parameters["s3ObjectPath"]:
logger.error('The parameter "s3ObjectPath" must have a defined value.')
raise ValueError('The parameter "s3ObjectPath" must have a defined value.')
return rule_parameters


def lambda_handler(event, context):
"""Lambda handler to check CloudTrail trails are logging.
"""
Keyword arguments:
event -- the event variable given in the lambda handler
context -- the context variable given in the lambda handler
"""
global AWS_CONFIG_CLIENT
global AWS_S3_CLIENT
global AWS_ACCOUNT_ID
global EXECUTION_ROLE_NAME
global AUDIT_ACCOUNT_ID
logger.info("Received Event: %s", json.dumps(event, indent=2))

evaluations = []
rule_parameters = {}
rule_parameters = json.loads(event.get("ruleParameters", "{}"))
valid_rule_parameters = check_required_parameters(rule_parameters, ["s3ObjectPath"], logger)
execution_role_name = valid_rule_parameters.get("ExecutionRoleName", "AWSA-GCLambdaExecutionRole")
audit_account_id = valid_rule_parameters.get("AuditAccountID", "")
invoking_event = json.loads(event["invokingEvent"])
logger.info("Received Event: %s", json.dumps(event, indent=2))
aws_account_id = event["accountId"]
evaluations = []

# parse parameters
AWS_ACCOUNT_ID = event["accountId"]
compliance_type = "NOT_APPLICABLE"
annotation = "Guardrail only applicable in the Audit Account"

if "ruleParameters" in event:
rule_parameters = json.loads(event["ruleParameters"])
if not is_scheduled_notification(invoking_event["messageType"]):
logger.error("Skipping assessments as this is not a scheduled invocation")
return

valid_rule_parameters = evaluate_parameters(rule_parameters)
if aws_account_id != audit_account_id:
logger.info(
"Account management plan document not checked in account %s - not the Audit account", aws_account_id
)
return

if "ExecutionRoleName" in valid_rule_parameters:
EXECUTION_ROLE_NAME = valid_rule_parameters["ExecutionRoleName"]
else:
EXECUTION_ROLE_NAME = "AWSA-GCLambdaExecutionRole"
aws_config_client = get_client("config", aws_account_id, execution_role_name)
aws_s3_client = get_client("s3", aws_account_id, execution_role_name)

if "AuditAccountID" in valid_rule_parameters:
AUDIT_ACCOUNT_ID = valid_rule_parameters["AuditAccountID"]
if check_s3_object_exists(aws_s3_client, valid_rule_parameters["s3ObjectPath"]):
compliance_type = "COMPLIANT"
annotation = "Account management plan document found"
else:
AUDIT_ACCOUNT_ID = ""

compliance_value = "NOT_APPLICABLE"
custom_annotation = "Guardrail only applicable in the Audit Account"
compliance_type = "NON_COMPLIANT"
annotation = "Account management plan document NOT found"

# is this a scheduled invocation?
if is_scheduled_notification(invoking_event["messageType"]):
# yes, proceed
# is this being executed against the Audit Account -
# (this check only applies to the audit account)
if AWS_ACCOUNT_ID == AUDIT_ACCOUNT_ID:
# yes, we're in the Audit account
AWS_CONFIG_CLIENT = get_client("config", event, ASSUME_ROLE_MODE, AWS_ACCOUNT_ID)
AWS_S3_CLIENT = get_client("s3", event, ASSUME_ROLE_MODE, AWS_ACCOUNT_ID)
# check if object exists in S3
if check_s3_object_exists(AWS_S3_CLIENT, valid_rule_parameters["s3ObjectPath"]):
compliance_value = "COMPLIANT"
custom_annotation = "Account management plan document found"
else:
compliance_value = "NON_COMPLIANT"
custom_annotation = "Account management plan document NOT found"
# Update AWS Config with the evaluation result
evaluations.append(
build_evaluation(
event["accountId"],
compliance_value,
event,
resource_type=DEFAULT_RESOURCE_TYPE,
annotation=custom_annotation,
)
)
AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=event["resultToken"])
else:
logger.info(
"Account management plan document not checked in account %s - not the Audit account", AWS_ACCOUNT_ID
)
evaluations.append(build_evaluation(aws_account_id, compliance_type, event, annotation=annotation))
submit_evaluations(aws_config_client, event["resultToken"], evaluations)

0 comments on commit 4959a38

Please sign in to comment.