From 2b66bf1303b63cfb38a7588dcd83d77ff032a9e4 Mon Sep 17 00:00:00 2001 From: TylerTMizuyabu Date: Wed, 11 Dec 2024 17:47:52 -0500 Subject: [PATCH] Gr01.6 (#84) Adds a lambda for guardrail check 1.06. This lambda queries Identity Center and IAM for all users, then checks their policies. It gives compliant if there is at least one privileged user with admin access, and if no non-privileged users have admin access. Also updates 02 because I believe the backslash isn't needed to escape the '{' character --- .../customizations/GCGuardrailsRoles.yaml | 1 - .../AuditAccountPreRequisitesPart2.yaml | 20 +- arch/templates/ConformancePack.yaml | 57 ++ arch/templates/main.yaml | 25 + .../aws_lambda_permissions_setup/app.py | 2 +- .../.gitignore | 244 +++++++ .../README.md | 39 ++ .../__init__.py | 0 .../gc01_check_dedicated_admin_account/app.py | 638 ++++++++++++++++++ .../events/event.json | 62 ++ .../requirements.txt | 0 .../template.yaml | 19 + .../app.py | 6 +- 13 files changed, 1098 insertions(+), 15 deletions(-) create mode 100644 src/lambda/gc01_check_dedicated_admin_account/.gitignore create mode 100644 src/lambda/gc01_check_dedicated_admin_account/README.md create mode 100644 src/lambda/gc01_check_dedicated_admin_account/__init__.py create mode 100644 src/lambda/gc01_check_dedicated_admin_account/app.py create mode 100644 src/lambda/gc01_check_dedicated_admin_account/events/event.json create mode 100644 src/lambda/gc01_check_dedicated_admin_account/requirements.txt create mode 100644 src/lambda/gc01_check_dedicated_admin_account/template.yaml diff --git a/arch/lza_extensions/customizations/GCGuardrailsRoles.yaml b/arch/lza_extensions/customizations/GCGuardrailsRoles.yaml index 6d45a45a..c79eb094 100644 --- a/arch/lza_extensions/customizations/GCGuardrailsRoles.yaml +++ b/arch/lza_extensions/customizations/GCGuardrailsRoles.yaml @@ -49,7 +49,6 @@ Resources: "arn:aws:sts::${AuditAccountID}:assumed-role/${RolePrefix}default_assessment_role/${OrganizationName}gc01_check_root_mfa", "arn:aws:sts::${AuditAccountID}:assumed-role/${RolePrefix}default_assessment_role/${OrganizationName}gc02_check_group_access_configuration", "arn:aws:sts::${AuditAccountID}:assumed-role/${RolePrefix}default_assessment_role/${OrganizationName}gc02_check_iam_password_policy", - "arn:aws:sts::${AuditAccountID}:assumed-role/${RolePrefix}default_assessment_role/${OrganizationName}gc02_check_least_privileged_roles", "arn:aws:sts::${AuditAccountID}:assumed-role/${RolePrefix}default_assessment_role/${OrganizationName}gc02_check_privileged_roles_review", "arn:aws:sts::${AuditAccountID}:assumed-role/${RolePrefix}default_assessment_role/${OrganizationName}gc03_check_endpoint_access_config", "arn:aws:sts::${AuditAccountID}:assumed-role/${RolePrefix}default_assessment_role/${OrganizationName}gc03_check_iam_cloudwatch_alarms", diff --git a/arch/templates/AuditAccountPreRequisitesPart2.yaml b/arch/templates/AuditAccountPreRequisitesPart2.yaml index c331d231..ff1b6eeb 100644 --- a/arch/templates/AuditAccountPreRequisitesPart2.yaml +++ b/arch/templates/AuditAccountPreRequisitesPart2.yaml @@ -37,35 +37,35 @@ Resources: Runtime: !Ref PythonRuntime Timeout: 180 - ## GC02 - GC02CheckIAMPasswordPolicyLambda: + GC01CheckDedicatedAdminAccountLambda: Condition: IsAuditAccount Type: AWS::Lambda::Function Properties: - FunctionName: !Sub "${OrganizationName}gc02_check_iam_password_policy" - Code: "../../src/lambda/gc02_check_iam_password_policy/build/GC02CheckIAMPasswordPolicyLambda/" + FunctionName: !Sub "${OrganizationName}gc01_check_dedicated_admin_account" + Code: "../../src/lambda/gc01_check_dedicated_admin_account/build/GC01CheckDedicatedAdminAccountLambda/" Handler: app.lambda_handler Role: !Sub "arn:aws:iam::${AuditAccountID}:role/${RolePrefix}default_assessment_role" Runtime: !Ref PythonRuntime Timeout: 180 - GC02CheckGroupAccessConfigurationLambda: + ## GC02 + GC02CheckIAMPasswordPolicyLambda: Condition: IsAuditAccount Type: AWS::Lambda::Function Properties: - FunctionName: !Sub "${OrganizationName}gc02_check_group_access_configuration" - Code: "../../src/lambda/gc02_check_group_access_configuration/build/GC02CheckGroupAccessConfigurationLambda/" + FunctionName: !Sub "${OrganizationName}gc02_check_iam_password_policy" + Code: "../../src/lambda/gc02_check_iam_password_policy/build/GC02CheckIAMPasswordPolicyLambda/" Handler: app.lambda_handler Role: !Sub "arn:aws:iam::${AuditAccountID}:role/${RolePrefix}default_assessment_role" Runtime: !Ref PythonRuntime Timeout: 180 - GC02CheckLeastPrivilegedRolesLambda: + GC02CheckGroupAccessConfigurationLambda: Condition: IsAuditAccount Type: AWS::Lambda::Function Properties: - FunctionName: !Sub "${OrganizationName}gc02_check_least_privileged_roles" - Code: "../../src/lambda/gc02_check_least_privileged_roles/build/GC02CheckLeastPrivilegedRolesLambda/" + FunctionName: !Sub "${OrganizationName}gc02_check_group_access_configuration" + Code: "../../src/lambda/gc02_check_group_access_configuration/build/GC02CheckGroupAccessConfigurationLambda/" Handler: app.lambda_handler Role: !Sub "arn:aws:iam::${AuditAccountID}:role/${RolePrefix}default_assessment_role" Runtime: !Ref PythonRuntime diff --git a/arch/templates/ConformancePack.yaml b/arch/templates/ConformancePack.yaml index 0a48a776..3acc02c1 100644 --- a/arch/templates/ConformancePack.yaml +++ b/arch/templates/ConformancePack.yaml @@ -51,6 +51,10 @@ Parameters: PasswordPolicyHardExpiry: Default: "False" Type: String + PrivilegedUsersFilePath: + Type: String + NonPrivilegedUsersFilePath: + Type: String # GC02 S3AccountManagementPlanPath: Type: String @@ -309,6 +313,49 @@ Resources: SourceDetails: - EventSource: "aws.config" MessageType: "ScheduledNotification" + + GC01CheckDedicatedAdminAccount: + Type: "AWS::Config::ConfigRule" + Properties: + ConfigRuleName: gc01_check_dedicated_admin_account + Description: Checks that there are dedicated user accounts for administration. + InputParameters: + PrivilegedUsersFilePath: + Fn::If: + - privilegedUsersFilePath + - Ref: PrivilegedUsersFilePath + - Ref: AWS::NoValue + NonPrivilegedUsersFilePath: + Fn::If: + - nonPrivilegedUsersFilePath + - Ref: NonPrivilegedUsersFilePath + - Ref: AWS::NoValue + ExecutionRoleName: + Fn::If: + - GCLambdaExecutionRoleName + - Ref: GCLambdaExecutionRoleName + - Ref: AWS::NoValue + AuditAccountID: + Fn::If: + - auditAccountID + - Ref: AuditAccountID + - Ref: AWS::NoValue + Scope: + ComplianceResourceTypes: + - AWS::Account + MaximumExecutionFrequency: TwentyFour_Hours + Source: + Owner: CUSTOM_LAMBDA + SourceIdentifier: + Fn::Join: + - "" + - - "arn:aws:lambda:ca-central-1:" + - Ref: AuditAccountID + - !Sub ":function:${OrganizationName}gc01_check_dedicated_admin_account" + SourceDetails: + - EventSource: "aws.config" + MessageType: "ScheduledNotification" + GC01CheckMFAIAMUsersConfigRule: Type: "AWS::Config::ConfigRule" Properties: @@ -1753,6 +1800,16 @@ Conditions: - Fn::Equals: - "" - Ref: PasswordPolicyHardExpiry + privilegedUsersFilePath: + Fn::Not: + - Fn::Equals: + - "" + - Ref: PrivilegedUsersFilePath + nonPrivilegedUsersFilePath: + Fn::Not: + - Fn::Equals: + - "" + - Ref: NonPrivilegedUsersFilePath # GC02 s3AccountManagementPlanPath: Fn::Not: diff --git a/arch/templates/main.yaml b/arch/templates/main.yaml index 0119d489..e85c13d8 100644 --- a/arch/templates/main.yaml +++ b/arch/templates/main.yaml @@ -266,6 +266,8 @@ Resources: "rds:Describe*", "resource-explorer-2:ListIndexes", "resource-explorer-2:Search", + "sso:List*", + "sso:GetInlinePolicyForPermissionSet", "s3:Get*", "s3:GetBucketPublicAccessBlock", "s3:List*", @@ -1179,6 +1181,14 @@ Resources: - "iam:Get*" - "iam:List*" - "iam:Simulate*" + - "iam:ListUserPolicies" + - "iam:GetUserPolicy" + - "iam:ListAttachedUserPolicies" + - "iam:ListPolicies" + - "iam:ListGroupsForUser" + - "iam:ListAttachedGroupPolicies" + - "iam:ListGroupPolicies" + - "iam:GetPolicyVersion" Resource: "*" Effect: Allow - Sid: AllowCloudWatchAlarmQueries @@ -1243,6 +1253,13 @@ Resources: - "s3:ListAllMyBuckets" - "s3:ListBucket" - "s3:GetBucketPublicAccessBlock" + - "sso:ListInstances" + - "sso:ListUsers" + - "sso:ListManagedPoliciesInPermissionSet" + - "sso:GetInlinePolicyForPermissionSet" + - "sso:ListCustomerManagedPolicyReferencesInPermissionSet" + - "sso:ListPermissionSetsProvisionedToAccount" + - "sso:ListAccountAssignments" - "sns:GetSubscriptionAttributes" - "sns:GetTopicAttributes" - "sns:ListSubscriptionsByTopic" @@ -1490,6 +1507,14 @@ Resources: ParameterValue: !Sub - "s3://${ClientEvidenceBucket}/gc-03/vpn_ip_ranges.txt" - ClientEvidenceBucket: !If [ GenerateEvidenceBucketName, !GetAtt GenerateEvidenceBucketName.EvidenceBucketName, !Ref EvidenceBucketName ] + - ParameterName: PrivilegedUsersFilePath + ParameterValue: !Sub + - "s3://${ClientEvidenceBucket}/gc-01/privileged_users.txt" + - ClientEvidenceBucket: !If [ GenerateEvidenceBucketName, !GetAtt GenerateEvidenceBucketName.EvidenceBucketName, !Ref EvidenceBucketName ] + - ParameterName: NonPrivilegedUsersFilePath + ParameterValue: !Sub + - "s3://${ClientEvidenceBucket}/gc-01/non_privileged_users.txt" + - ClientEvidenceBucket: !If [ GenerateEvidenceBucketName, !GetAtt GenerateEvidenceBucketName.EvidenceBucketName, !Ref EvidenceBucketName ] OrganizationConformancePackName: !Sub "${OrganizationName}-GC-CP-Guardrails" TemplateS3Uri: !Sub s3://${PipelineBucket}/${DeployVersion}/ConformancePack.yaml diff --git a/src/lambda/aws_lambda_permissions_setup/app.py b/src/lambda/aws_lambda_permissions_setup/app.py index e940903b..72bb0bc2 100644 --- a/src/lambda/aws_lambda_permissions_setup/app.py +++ b/src/lambda/aws_lambda_permissions_setup/app.py @@ -72,6 +72,7 @@ def apply_lambda_permissions(): lambda_functions = { f"{organization_name}gc01_check_alerts_flag_misuse": ["GC01CheckAlertsFlagMisuseLambda"], f"{organization_name}gc01_check_attestation_letter": ["GC01CheckAttestationLetterLambda"], + f"{organization_name}gc01_check_dedicated_admin_account": ["GC01CheckDedicatedAdminAccountLambda"], f"{organization_name}gc01_check_federated_users_mfa": ["GC01CheckFederatedUsersMFALambda"], f"{organization_name}gc01_check_iam_users_mfa": ["GC01CheckIAMUsersMFALambda"], f"{organization_name}gc01_check_mfa_digital_policy": ["GC01CheckMFADigitalPolicy"], @@ -81,7 +82,6 @@ def apply_lambda_permissions(): f"{organization_name}gc02_check_password_protection_mechanisms": ["GC02CheckPasswordProtectionMechanismsLambda"], f"{organization_name}gc02_check_iam_password_policy": ["GC02CheckIAMPasswordPolicyLambda"], f"{organization_name}gc02_check_group_access_configuration": ["GC02CheckGroupAccessConfigurationLambda"], - f"{organization_name}gc02_check_least_privileged_roles": ["GC02CheckLeastPrivilegedRolesLambda"], f"{organization_name}gc02_check_privileged_roles_review": ["GC02CheckPrivilegedRolesReviewLambda"], f"{organization_name}gc03_check_endpoint_access_config": ["GC03CheckEndpointAccessConfigLambda"], f"{organization_name}gc03_check_iam_cloudwatch_alarms": ["GC03CheckIAMCloudWatchAlarmsLambda"], diff --git a/src/lambda/gc01_check_dedicated_admin_account/.gitignore b/src/lambda/gc01_check_dedicated_admin_account/.gitignore new file mode 100644 index 00000000..4808264d --- /dev/null +++ b/src/lambda/gc01_check_dedicated_admin_account/.gitignore @@ -0,0 +1,244 @@ + +# Created by https://www.gitignore.io/api/osx,linux,python,windows,pycharm,visualstudiocode + +### Linux ### +*~ + +# temporary files which can be created if a process still has a handle open of a deleted file +.fuse_hidden* + +# KDE directory preferences +.directory + +# Linux trash folder which might appear on any partition or disk +.Trash-* + +# .nfs files are created when an open file is removed but is still being accessed +.nfs* + +### OSX ### +*.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +### PyCharm ### +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# CMake +cmake-build-debug/ + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Ruby plugin and RubyMine +/.rakeTasks + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +### PyCharm Patch ### +# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721 + +# *.iml +# modules.xml +# .idea/misc.xml +# *.ipr + +# Sonarlint plugin +.idea/sonarlint + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +.pytest_cache/ +nosetests.xml +coverage.xml +*.cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule.* + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ + +### VisualStudioCode ### +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +.history + +### Windows ### +# Windows thumbnail cache files +Thumbs.db +ehthumbs.db +ehthumbs_vista.db + +# Folder config file +Desktop.ini + +# Recycle Bin used on file shares +$RECYCLE.BIN/ + +# Windows Installer files +*.cab +*.msi +*.msm +*.msp + +# Windows shortcuts +*.lnk + +# Build folder + +*/build/* + +# End of https://www.gitignore.io/api/osx,linux,python,windows,pycharm,visualstudiocode \ No newline at end of file diff --git a/src/lambda/gc01_check_dedicated_admin_account/README.md b/src/lambda/gc01_check_dedicated_admin_account/README.md new file mode 100644 index 00000000..2928c0fa --- /dev/null +++ b/src/lambda/gc01_check_dedicated_admin_account/README.md @@ -0,0 +1,39 @@ +*This readme file was created by AWS Bedrock: anthropic.claude-v2* + +# ./src/lambda/gc01_check_dedicated_admin_account/app.py + +## Overview + +GC01 - Check For Dedicated Admin Accounts + +Demonstrates that there are dedicated user accounts for administration. + +## Functions + +### lambda_handler + +This is the main handler function, the entry point for the Lambda function. + +It loads the rule parameters, gets the relevant AWS clients with assumed roles if configured, checks if this is a scheduled run and runs the compliance check if so, returning evaluation results to AWS Config. + +### build_evaluation + +Builds an AWS Config evaluation object from the provided parameters. + +## Deployment + +This function runs on AWS Lambda and is deployed via the Serverless Framework and CI/CD pipelines. + +The Lambda execution role must have permissions to call AWS Config and IAM. + +Cross account access can be enabled by passing the Audit account ID and Execution role name. + +## Testing + +Testing is done via unit tests and manual invocation. + +## Logging + +Logging is done via the Python logging library. + +The lambda runtime logs are enabled. diff --git a/src/lambda/gc01_check_dedicated_admin_account/__init__.py b/src/lambda/gc01_check_dedicated_admin_account/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/lambda/gc01_check_dedicated_admin_account/app.py b/src/lambda/gc01_check_dedicated_admin_account/app.py new file mode 100644 index 00000000..26a0c9d1 --- /dev/null +++ b/src/lambda/gc01_check_dedicated_admin_account/app.py @@ -0,0 +1,638 @@ +""" GC02 - Check IAM Password Policy + https://canada-ca.github.io/cloud-guardrails/EN/02_Management-Admin-Privileges.html +""" +import json +import logging +import re +import time + +import boto3 +import botocore +import botocore.exceptions + +# Logging setup +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +# Set to True to get the lambda to assume the Role attached on the Config Service +ASSUME_ROLE_MODE = True +DEFAULT_RESOURCE_TYPE = "AWS::::Account" +USER_RESOURCE_TYPE = "AWS::IAM::User" + + +# This gets the client after assuming the Config service role +# either in the same AWS account or cross-account. +def get_client(service, event): + """Return the service boto client. It should be used instead of directly calling the client. + Keyword arguments: + service -- the service name used for calling the boto.client() + event -- the event variable given in the lambda handler + """ + if not ASSUME_ROLE_MODE or (AWS_ACCOUNT_ID == AUDIT_ACCOUNT_ID): + return boto3.client(service) + execution_role_arn = f"arn:aws:iam::{AWS_ACCOUNT_ID}:role/{EXECUTION_ROLE_NAME}" + credentials = get_assume_role_credentials(execution_role_arn) + return boto3.client( + service, + aws_access_key_id=credentials["AccessKeyId"], + aws_secret_access_key=credentials["SecretAccessKey"], + aws_session_token=credentials["SessionToken"], + ) + + +def get_assume_role_credentials(role_arn): + """Returns the temporary credentials from ASSUME_ROLE_MODE role. + Keyword arguments: + role_arn -- the ARN of the role to assume + """ + sts_client = boto3.client("sts") + try: + assume_role_response = sts_client.assume_role( + RoleArn=role_arn, + RoleSessionName="configLambdaExecution" + ) + return assume_role_response["Credentials"] + except botocore.exceptions.ClientError as ex: + # Scrub error message for any internal account info leaks + if "AccessDenied" in ex.response["Error"]["Code"]: + ex.response["Error"]["Message"] = "AWS Config does not have permission to assume the IAM role." + else: + ex.response["Error"]["Message"] = "InternalError" + ex.response["Error"]["Code"] = "InternalError" + raise ex + + +def is_scheduled_notification(message_type): + """Check whether the message is a ScheduledNotification or not. + Keyword arguments: + message_type -- the message type + """ + return message_type == "ScheduledNotification" + + +def evaluate_parameters(rule_parameters): + """Evaluate the rule parameters. + Keyword arguments: + rule_parameters -- the Key/Value dictionary of the rule parameters + """ + if "PrivilegedUsersFilePath" not in rule_parameters: + logger.error('The parameter with "PrivilegedUsersFilePath" as key must be defined.') + raise ValueError('The parameter with "PrivilegedUsersFilePath" as key must be defined.') + if not rule_parameters["PrivilegedUsersFilePath"]: + logger.error('The parameter "PrivilegedUsersFilePath" must have a defined value.') + raise ValueError('The parameter "PrivilegedUsersFilePath" must have a defined value.') + + if "NonPrivilegedUsersFilePath" not in rule_parameters: + logger.error('The parameter with "NonPrivilegedUsersFilePath" as key must be defined.') + raise ValueError('The parameter with "NonPrivilegedUsersFilePath" as key must be defined.') + if not rule_parameters["NonPrivilegedUsersFilePath"]: + logger.error('The parameter "NonPrivilegedUsersFilePath" must have a defined value.') + raise ValueError('The parameter "NonPrivilegedUsersFilePath" must have a defined value.') + + return rule_parameters + + +# This generate an evaluation for config +def build_evaluation( + resource_id, + compliance_type, + event, + resource_type=DEFAULT_RESOURCE_TYPE, + annotation=None, +): + """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. + Keyword arguments: + resource_id -- the unique id of the resource to report + compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE + event -- the event variable given in the lambda handler + resource_type -- the CloudFormation resource type (or AWS::::Account) + to report on the rule (default DEFAULT_RESOURCE_TYPE) + annotation -- an annotation to be added to the evaluation (default None) + """ + eval_cc = {} + if annotation: + eval_cc["Annotation"] = annotation + eval_cc["ComplianceResourceType"] = resource_type + eval_cc["ComplianceResourceId"] = resource_id + eval_cc["ComplianceType"] = compliance_type + eval_cc["OrderingTimestamp"] = str( + json.loads(event["invokingEvent"])["notificationCreationTime"] + ) + return eval_cc + +def fetch_users(): + users = [] + marker = None + try: + while True: + response = AWS_IAM_CLIENT.list_users(Marker=marker) if marker else AWS_IAM_CLIENT.list_users() + users = users + response.get("Users", []) + marker = response.get("Marker") + if not marker: + break + except botocore.exceptions.ClientError as ex: + logger.info(ex) + ex.response["Error"]["Message"] = "InternalError" + ex.response["Error"]["Code"] = "InternalError" + raise ex + return users + +def extract_bucket_name_and_key(object_path): + match = re.match(r"s3:\/\/([^/]+)\/((?:[^/]*/)*.*)", object_path) + if match: + bucket_name = match.group(1) + key_name = match.group(2) + else: + logger.error("Unable to parse S3 object path %s", object_path) + raise ValueError(f"Unable to parse S3 object path {object_path}") + return bucket_name,key_name + +def read_s3_object(s3_file_path) -> str: + bucket, key = extract_bucket_name_and_key(s3_file_path) + response = AWS_S3_CLIENT.get_object(Bucket=bucket, Key=key) + rule_naming_convention = response.get("Body").read().decode("utf-8") + return rule_naming_convention + +def check_s3_object_exists(object_path): + """Check if the S3 object exists + Keyword arguments: + object_path -- the S3 object path + """ + # parse the S3 path + bucket_name, key_name = extract_bucket_name_and_key(object_path) + try: + AWS_S3_CLIENT.head_object(Bucket=bucket_name, Key=key_name) + except botocore.exceptions.ClientError as err: + if err.response["Error"]["Code"] == "404": + # The object does not exist. + logger.info("Object %s not found in bucket %s", key_name, bucket_name) + return False + elif err.response["Error"]["Code"] == "403": + # AccessDenied + logger.info("Access denied to bucket %s", bucket_name) + return False + else: + # Something else has gone wrong. + logger.error("Error trying to find object %s in bucket %s", key_name, bucket_name) + raise ValueError(f"Error trying to find object {key_name} in bucket {bucket_name}") from err + else: + # The object does exist. + return True + +def fetch_sso_instances(): + instances = [] + nextToken = None + try: + while True: + response = AWS_SSO_ADMIN_CLIENT.list_instances(NextToken=nextToken) if nextToken else AWS_SSO_ADMIN_CLIENT.list_instances() + instances = instances + response.get("Instances", []) + nextToken = response.get("NextToken") + if not nextToken: + break + except botocore.exceptions.ClientError as ex: + logger.info(f"fetch_sso_instances exception: {ex}") + ex.response["Error"]["Message"] = "InternalError" + ex.response["Error"]["Code"] = "InternalError" + raise ex + + return instances + +def fetch_sso_users(): + instances = fetch_sso_instances() + users_by_instance = {} + for i in instances: + if i.get("Status") != "Active": + continue + + instance_id = i.get("IdentityStoreId") + instance_arn = i.get("InstanceArn") + users_by_instance[instance_arn] = [] + nextToken = None + try: + while True: + response = AWS_IDENTITY_STORE_CLIENT.list_users(IdentityStoreId=instance_id, NextToken=nextToken) if nextToken else AWS_IDENTITY_STORE_CLIENT.list_users(IdentityStoreId=instance_id) + users_by_instance[instance_arn] = users_by_instance[instance_arn] + response.get("Users", []) + nextToken = response.get("NextToken") + if not nextToken: + break + except botocore.exceptions.ClientError as ex: + ex.response["Error"]["Message"] = "InternalError" + ex.response["Error"]["Code"] = "InternalError" + raise ex + return users_by_instance + +def policy_doc_gives_admin_access(policy_doc: str) -> bool: + document_dict = json.loads(policy_doc) + statement = document_dict.get("Statement", []) + for statement_component in statement: + if statement_component.get("Effect", "") == "Allow" and statement_component.get("Action", "") == "*" and statement_component.get("Resource", "") == "*": + return True + return False + +def fetch_inline_user_policies(user_name): + policies = [] + marker = None + try: + # fetching policies directly attached to the user + while True: + response = AWS_IAM_CLIENT.list_user_policies(UserName=user_name, Marker=marker) if marker else AWS_IAM_CLIENT.list_user_policies(UserName=user_name) + policies = policies + response.get("PolicyNames", []) + marker = response.get("Marker") + if not marker: + break + + # fetching policies the user has access to through groups + groups = [] + while True: + response = AWS_IAM_CLIENT.list_groups_for_user(UserName=user_name, Marker=marker) if marker else AWS_IAM_CLIENT.list_groups_for_user(UserName=user_name) + groups = groups + response.get("Groups", []) + for g in groups: + inner_marker = None + while True: + response = AWS_IAM_CLIENT.list_group_policies(GroupName=g.get("GroupName"), Marker=inner_marker) if inner_marker else AWS_IAM_CLIENT.list_group_policies(GroupName=g.get("GroupName")) + policies = policies + response.get("PolicyNames", []) + inner_marker = response.get("Marker") + if not inner_marker: + break + marker = response.get("Marker") + + if not marker: + break + + except botocore.exceptions.ClientError as ex: + if "NoSuchEntity" in ex.response['Error']['Code']: + ex.response['Error']['Message'] = f"Unable to fetch policies for user '{user_name}'. No such entity found." + elif "InvalidInput" in ex.response['Error']['Code']: + ex.response['Error']['Message'] = f"Invalid username '{user_name}' or marker '{marker}' input received." + else: + ex.response["Error"]["Message"] = "InternalError" + ex.response["Error"]["Code"] = "InternalError" + raise ex + + try: + for i in range(len(policies)): + policies[i] = AWS_IAM_CLIENT.get_user_policy(user_name, policies[i]) + except botocore.exceptions.ClientError as ex: + if "NoSuchEntity" in ex.response['Error']['Code']: + ex.response['Error']['Message'] = "Unable to fetch inline policy information. No such entity found." + elif "InvalidInput" in ex.response['Error']['Code']: + ex.response['Error']['Message'] = "Failed to fetch inline policy information. Invalid input." + else: + ex.response["Error"]["Message"] = "InternalError" + ex.response["Error"]["Code"] = "InternalError" + + return policies + +def fetch_aws_managed_user_policies(user_name): + policies = [] + marker = None + try: + # fetching policies directly attached to the user + while True: + response = AWS_IAM_CLIENT.list_attached_user_policies(UserName=user_name, Marker=marker) if marker else AWS_IAM_CLIENT.list_attached_user_policies(UserName=user_name) + policies = policies + response.get("AttachedPolicies", []) + marker = response.get("Marker") + if not marker: + break + + # fetching policies the user has access to through groups + groups = [] + while True: + response = AWS_IAM_CLIENT.list_groups_for_user(UserName=user_name, Marker=marker) if marker else AWS_IAM_CLIENT.list_groups_for_user(UserName=user_name) + groups = groups + response.get("Groups", []) + for g in groups: + inner_marker = None + while True: + response = AWS_IAM_CLIENT.list_attached_group_policies(GroupName=g.get("GroupName"), Marker=inner_marker) if inner_marker else AWS_IAM_CLIENT.list_attached_group_policies(GroupName=g.get("GroupName")) + policies = policies + response.get("AttachedPolicies", []) + inner_marker = response.get("Marker") + if not inner_marker: + break + marker = response.get("Marker") + + if not marker: + break + + return policies + except botocore.exceptions.ClientError as ex: + if "NoSuchEntity" in ex.response['Error']['Code']: + ex.response['Error']['Message'] = f"Unable to fetch policies for user '{user_name}'. No such entity found." + elif "InvalidInput" in ex.response['Error']['Code']: + ex.response['Error']['Message'] = f"Invalid user name '{user_name}' or marker '{marker}' input received." + else: + ex.response["Error"]["Message"] = "InternalError" + ex.response["Error"]["Code"] = "InternalError" + raise ex + +def fetch_customer_managed_policy_documents_for_permission_set(customer_managed_policies): + policy_docs = [] + try: + marker = None + while True: + response = AWS_IAM_CLIENT.list_policies(Scope="Local", Marker=marker) if marker else AWS_IAM_CLIENT.list_policies(Scope="Local") + policies = response.get("Policies") + for p in policies: + if p.get("PolicyName") in customer_managed_policies: + p_doc_response = AWS_IAM_CLIENT.get_policy_version(p.get(PolicyArn="Arn"), VersionId=p.get("DefaultVersionId")) + policy_docs.append(p_doc_response.get("PolicyVersion").get("Document")) + marker = response.get("Marker") + if not marker: + break + except botocore.exceptions.ClientError as ex: + ex.response["Error"]["Message"] = "InternalError" + ex.response["Error"]["Code"] = "InternalError" + raise ex + + return policy_docs + +def permission_set_has_admin_policies(instance_arn, permission_set_arn): + managed_policies = [] + inline_policies = [] + + next_token = None + try: + # Fetch all AWS Managed policies for the permission set and add them to the list of managed policies + while True: + response = AWS_SSO_ADMIN_CLIENT.list_managed_policies_in_permission_set(InstanceArn=instance_arn, NextToken=next_token, PermissionSetArn=permission_set_arn) if next_token else AWS_SSO_ADMIN_CLIENT.list_managed_policies_in_permission_set(InstanceArn=instance_arn, PermissionSetArn=permission_set_arn) + managed_policies = managed_policies + response.get("AttachedManagedPolicies") + next_token = response.get("NextToken") + if not next_token: + break + # Fetch the inline document for the permission set if any exists. If none exists the response will still be valid, just an empty policy doc. + response = AWS_SSO_ADMIN_CLIENT.get_inline_policy_for_permission_set(InstanceArn=instance_arn, PermissionSetArn=permission_set_arn) + # If length is less than or equal to 1 then the policy doc is empty because there is no inline policy.The API specifies a min length of 1, but is a bit vague on what an empty policy doc would look like so we are covering the case of empty string + if len(response.get("InlinePolicy")) > 1: + inline_policies.append({ + "PolicyDocument": response.get("InlinePolicy") + }) + # Fetch all customer managed policy references, convert the references into their policy document on the account, and add them to the list of inline policies. + while True: + response = AWS_SSO_ADMIN_CLIENT.list_customer_managed_policy_references_in_permission_set(InstanceArn=instance_arn, NextToken=next_token, PermissionSetArn=permission_set_arn) if next_token else AWS_SSO_ADMIN_CLIENT.list_customer_managed_policy_references_in_permission_set(InstanceArn=instance_arn, PermissionSetArn=permission_set_arn) + for policy_doc in fetch_customer_managed_policy_documents_for_permission_set([p.get("Name") for p in response.get("CustomerManagedPolicyReferences")]): + inline_policies.append({ + "PolicyDocument": policy_doc + }) + next_token = response.get("NextToken") + if not next_token: + break + + return policies_grant_admin_access(managed_policies, inline_policies) + except botocore.exceptions.ClientError as ex: + ex.response["Error"]["Message"] = "InternalError" + ex.response["Error"]["Code"] = "InternalError" + raise ex + +def get_organizations_mgmt_account_id(): + """ + This function returns the management account ID for the AWS Organizations + :return: + """ + management_account_id = "" + i_retry_limit = 10 + i_retries = 0 + b_completed = False + b_retry = True + while (b_retry and (not b_completed)) and (i_retries < i_retry_limit): + try: + response = AWS_ORGANIZATIONS_CLIENT.describe_organization() + if response: + organization = response.get("Organization", None) + if organization: + management_account_id = organization.get("MasterAccountId", "") + else: + logger.error("Unable to read the Organization from the dict") + else: + logger.error("Invalid response.") + b_completed = True + except botocore.exceptions.ClientError as ex: + if "AccessDenied" in ex.response["Error"]["Code"]: + logger.error("ACCESS DENIED when trying to describe_organization") + management_account_id = "ERROR" + b_retry = False + elif "AWSOrganizationsNotInUse" in ex.response["Error"]["Code"]: + logger.error("AWS Organizations not in use") + management_account_id = "ERROR" + b_retry = False + elif "ServiceException" in ex.response["Error"]["Code"]: + logger.error("AWS Organizations Service Exception") + management_account_id = "ERROR" + b_retry = False + elif ("ConcurrentModification" in ex.response["Error"]["Code"]) or ("TooManyRequests" in ex.response["Error"]["Code"]): + logger.info("AWS Organizations API is throttling requests or going through a modification. Will retry.") + time.sleep(2) + if i_retries >= i_retry_limit: + logger.error("Retry limit reached. Returning an error") + management_account_id = "ERROR" + b_retry = False + else: + i_retries += 1 + except ValueError: + logger.error("Unknown exception - get_organizations_mgmt_account_id") + management_account_id = "ERROR" + return management_account_id + +def get_admin_permission_sets_for_instance_by_account(instance_arn): + permission_sets = {} + next_token = None + try: + accounts = organizations_list_all_accounts() + for a in accounts: + account_id=a.get("Id") + permission_sets[account_id] = [] + while True: + response = AWS_SSO_ADMIN_CLIENT.list_permission_sets_provisioned_to_account(AccountId=AWS_ACCOUNT_ID,InstanceArn=instance_arn, NextToken = next_token) if next_token else AWS_SSO_ADMIN_CLIENT.list_permission_sets_provisioned_to_account(InstanceArn=instance_arn) + for p_set in response.get("PermissionSets"): + if permission_set_has_admin_policies(instance_arn, p_set): + permission_sets[account_id].append(p_set) + next_token = response.get("NextToken") + if not next_token: + break + except botocore.exceptions.ClientError as ex: + ex.response["Error"]["Message"] = "InternalError" + ex.response["Error"]["Code"] = "InternalError" + raise ex + + return permission_sets + +def user_assigned_to_permission_set(user_id, instance_arn, account_id, permission_set_arn): + try: + next_token = None + while True: + response = AWS_SSO_ADMIN_CLIENT.list_account_assignments(AccountId=account_id, InstanceArn=instance_arn, PermissionSetArn=permission_set_arn, NextToken=next_token) if next_token else AWS_SSO_ADMIN_CLIENT.list_account_assignments(AccountId=account_id, InstanceArn=instance_arn, PermissionSetArn=permission_set_arn) + for assignment in response.get("AccountAssignments"): + if assignment.get("PrincipalId") == user_id: + return True + next_token = response.get("NextToken") + if not next_token: + break + return False + except botocore.exceptions.ClientError as ex: + ex.response["Error"]["Message"] = "InternalError" + ex.response["Error"]["Code"] = "InternalError" + raise ex + +def check_users(iam_users, sso_users_by_instance, privileged_user_list, non_privileged_user_list, event): + evaluations = [] + at_least_one_privileged_user_has_admin_access = False + non_privileged_user_with_admin_access = [] + + for u in iam_users: + user_name = u.get("UserName") + logger.info(f"Checking iam users: {user_name}") + if user_name in privileged_user_list: + if at_least_one_privileged_user_has_admin_access == True: + continue + + managed_policies = fetch_aws_managed_user_policies(user_name) + inline_policies = fetch_inline_user_policies(user_name) + + at_least_one_privileged_user_has_admin_access = policies_grant_admin_access(managed_policies, inline_policies) + + elif user_name in non_privileged_user_list: + managed_policies = fetch_aws_managed_user_policies(user_name) + inline_policies = fetch_inline_user_policies(user_name) + if policies_grant_admin_access(managed_policies, inline_policies): + non_privileged_user_with_admin_access.append(user_name) + + for instance_arn in sso_users_by_instance.keys(): + admin_permission_sets_by_account=get_admin_permission_sets_for_instance_by_account(instance_arn) + for user in sso_users_by_instance[instance_arn]: + user_name = user.get("UserName") + logger.info(f"Checking sso instance user f{user_name}") + user_id = user.get("UserId") + if user_name in privileged_user_list: + if at_least_one_privileged_user_has_admin_access == True: + continue + + for a_id in admin_permission_sets_by_account.keys(): + for p_arn in admin_permission_sets_by_account[a_id]: + if user_assigned_to_permission_set(user_id, instance_arn, a_id, p_arn): + at_least_one_privileged_user_has_admin_access = True + break + + elif user_name in non_privileged_user_list: + for a_id in admin_permission_sets_by_account.keys(): + for p_arn in admin_permission_sets_by_account[a_id]: + if user_assigned_to_permission_set(user_id, instance_arn, a_id, p_arn): + non_privileged_user_with_admin_access.append(user_name) + + if at_least_one_privileged_user_has_admin_access and len(non_privileged_user_with_admin_access) == 0: + evaluations.append( + build_evaluation( + AWS_ACCOUNT_ID, + "COMPLIANT", + event, + DEFAULT_RESOURCE_TYPE + ) + ) + else: + failed_check_strings = [ + "no privileged user with admin access was found" if not at_least_one_privileged_user_has_admin_access else None, + f"non_privileged users {non_privileged_user_with_admin_access} have admin access" if len(non_privileged_user_with_admin_access) > 0 else None + ] + annotation = " and ".join([e for e in failed_check_strings if e is not None]).capitalize() + evaluations.append( + build_evaluation( + AWS_ACCOUNT_ID, + "NON_COMPLIANT", + event, + DEFAULT_RESOURCE_TYPE, + annotation + ) + ) + + return evaluations + +def organizations_list_all_accounts(interval_between_calls: float = 0.1) -> list[dict]: + """ + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/organizations/paginator/ListAccounts.html + """ + resources: list[dict] = [] + paginator = AWS_ORGANIZATIONS_CLIENT.get_paginator("list_accounts") + page_iterator = paginator.paginate() + for page in page_iterator: + resources.extend(page.get("Accounts", [])) + time.sleep(interval_between_calls) + return resources + +def policies_grant_admin_access(managed_policies, inline_policies): + return next((True for p in managed_policies if p.get("PolicyName", "") == "AdministratorAccess"), False) or next((True for p in inline_policies if policy_doc_gives_admin_access(p.get("PolicyDocument", '{}'))), False) + + +def lambda_handler(event, context): + """This function is the main entry point for Lambda. + Keyword arguments: + event -- the event variable given in the lambda handler + context -- the context variable given in the lambda handler + """ + global AWS_CONFIG_CLIENT + global AWS_ACCOUNT_ID + global AWS_IAM_CLIENT + global AWS_IDENTITY_STORE_CLIENT + global AWS_ORGANIZATIONS_CLIENT + global AWS_S3_CLIENT + global AWS_SSO_ADMIN_CLIENT + global EXECUTION_ROLE_NAME + global AUDIT_ACCOUNT_ID + + evaluations = [] + rule_parameters = {} + + invoking_event = json.loads(event["invokingEvent"]) + logger.info("Received event: %s", json.dumps(event, indent=2)) + + # parse parameters + AWS_ACCOUNT_ID = event["accountId"] + + if "ruleParameters" in event: + rule_parameters = json.loads(event["ruleParameters"]) + + valid_rule_parameters = evaluate_parameters(rule_parameters) + + if "ExecutionRoleName" in valid_rule_parameters: + EXECUTION_ROLE_NAME = valid_rule_parameters["ExecutionRoleName"] + else: + EXECUTION_ROLE_NAME = "AWSA-GCLambdaExecutionRole" + + if "AuditAccountID" in valid_rule_parameters: + AUDIT_ACCOUNT_ID = valid_rule_parameters["AuditAccountID"] + else: + AUDIT_ACCOUNT_ID = "" + + AWS_CONFIG_CLIENT = get_client("config", event) + AWS_IAM_CLIENT = get_client("iam", event) + AWS_SSO_ADMIN_CLIENT = get_client("sso-admin", event) + AWS_IDENTITY_STORE_CLIENT = get_client("identitystore", event) + AWS_S3_CLIENT = boto3.client("s3") + AWS_ORGANIZATIONS_CLIENT = get_client("organizations", event) + + # is this a scheduled invokation? + if is_scheduled_notification(invoking_event["messageType"]): + if AWS_ACCOUNT_ID == get_organizations_mgmt_account_id(): + # yes, proceed + privileged_users_file_path = valid_rule_parameters.get("PrivilegedUsersFilePath", "") + non_privileged_users_file_path = valid_rule_parameters.get("NonPrivilegedUsersFilePath", "") + if check_s3_object_exists(privileged_users_file_path) == False or check_s3_object_exists(non_privileged_users_file_path) == False: + evaluations.append( + build_evaluation( + event["accountId"], + "NON_COMPLIANT", + event, + resource_type=DEFAULT_RESOURCE_TYPE, + annotation="No privileged or non_privileged user file path input provided.", + ) + ) + else: + privileged_users_list = read_s3_object(privileged_users_file_path).splitlines() + non_privileged_users_file_path = read_s3_object(non_privileged_users_file_path).splitlines() + + iam_users = fetch_users() + sso_users_by_instance = fetch_sso_users() + + evaluations = evaluations + check_users(iam_users, sso_users_by_instance, privileged_users_list, non_privileged_users_file_path, event) + + logger.info(f"Updating evaluations: {evaluations}") + AWS_CONFIG_CLIENT.put_evaluations( + Evaluations=evaluations, + ResultToken=event["resultToken"] + ) diff --git a/src/lambda/gc01_check_dedicated_admin_account/events/event.json b/src/lambda/gc01_check_dedicated_admin_account/events/event.json new file mode 100644 index 00000000..a6197dea --- /dev/null +++ b/src/lambda/gc01_check_dedicated_admin_account/events/event.json @@ -0,0 +1,62 @@ +{ + "body": "{\"message\": \"hello world\"}", + "resource": "/hello", + "path": "/hello", + "httpMethod": "GET", + "isBase64Encoded": false, + "queryStringParameters": { + "foo": "bar" + }, + "pathParameters": { + "proxy": "/path/to/resource" + }, + "stageVariables": { + "baz": "qux" + }, + "headers": { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", + "Accept-Encoding": "gzip, deflate, sdch", + "Accept-Language": "en-US,en;q=0.8", + "Cache-Control": "max-age=0", + "CloudFront-Forwarded-Proto": "https", + "CloudFront-Is-Desktop-Viewer": "true", + "CloudFront-Is-Mobile-Viewer": "false", + "CloudFront-Is-SmartTV-Viewer": "false", + "CloudFront-Is-Tablet-Viewer": "false", + "CloudFront-Viewer-Country": "US", + "Host": "1234567890.execute-api.us-east-1.amazonaws.com", + "Upgrade-Insecure-Requests": "1", + "User-Agent": "Custom User Agent String", + "Via": "1.1 08f323deadbeefa7af34d5feb414ce27.cloudfront.net (CloudFront)", + "X-Amz-Cf-Id": "cDehVQoZnx43VYQb9j2-nvCh-9z396Uhbp027Y2JvkCPNLmGJHqlaA==", + "X-Forwarded-For": "127.0.0.1, 127.0.0.2", + "X-Forwarded-Port": "443", + "X-Forwarded-Proto": "https" + }, + "requestContext": { + "accountId": "123456789012", + "resourceId": "123456", + "stage": "prod", + "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef", + "requestTime": "09/Apr/2015:12:34:56 +0000", + "requestTimeEpoch": 1428582896000, + "identity": { + "cognitoIdentityPoolId": null, + "accountId": null, + "cognitoIdentityId": null, + "caller": null, + "accessKey": null, + "sourceIp": "127.0.0.1", + "cognitoAuthenticationType": null, + "cognitoAuthenticationProvider": null, + "userArn": null, + "userAgent": "Custom User Agent String", + "user": null + }, + "path": "/prod/hello", + "resourcePath": "/hello", + "httpMethod": "POST", + "apiId": "1234567890", + "protocol": "HTTP/1.1" + } +} diff --git a/src/lambda/gc01_check_dedicated_admin_account/requirements.txt b/src/lambda/gc01_check_dedicated_admin_account/requirements.txt new file mode 100644 index 00000000..e69de29b diff --git a/src/lambda/gc01_check_dedicated_admin_account/template.yaml b/src/lambda/gc01_check_dedicated_admin_account/template.yaml new file mode 100644 index 00000000..69bb03da --- /dev/null +++ b/src/lambda/gc01_check_dedicated_admin_account/template.yaml @@ -0,0 +1,19 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: > + gc01_check_dedicated_admin_account + +Globals: + Function: + Timeout: 180 + MemorySize: 128 + +Resources: + GC01CheckDedicatedAdminAccountLambda: + Type: AWS::Serverless::Function + Properties: + CodeUri: . + Handler: app.lambda_handler + Runtime: python3.9 + Architectures: + - x86_64 diff --git a/src/lambda/gc02_check_group_access_configuration/app.py b/src/lambda/gc02_check_group_access_configuration/app.py index 912ce074..65ec6165 100644 --- a/src/lambda/gc02_check_group_access_configuration/app.py +++ b/src/lambda/gc02_check_group_access_configuration/app.py @@ -1,4 +1,4 @@ -""" GC02 - Check IAM Password Policy +""" GC02 - Check Group Access Configuration https://canada-ca.github.io/cloud-guardrails/EN/02_Management-Admin-Privileges.html """ import json @@ -178,8 +178,8 @@ def check_group_policies(group_name, admin_accounts, event): inline_policies = fetch_inline_group_policies(group_name) # Checks for the aws managed policy AdministratorAccess or an inline policy that gives the same access. - has_admin_policy = next((True for p in managed_policies if p.get("PolicyName", "") == "AdministratorAccess"), False) or next((True for p in inline_policies if policy_doc_gives_admin_access(p.get("PolicyDocument", "\{\}"))), False) - has_non_admin_policy = next((True for p in managed_policies if p.get("PolicyName", "") != "AdministratorAccess"), False) or next((True for p in inline_policies if not policy_doc_gives_admin_access(p.get("PolicyDocument", "\{\}"))), False) + has_admin_policy = next((True for p in managed_policies if p.get("PolicyName", "") == "AdministratorAccess"), False) or next((True for p in inline_policies if policy_doc_gives_admin_access(p.get("PolicyDocument", "{}"))), False) + has_non_admin_policy = next((True for p in managed_policies if p.get("PolicyName", "") != "AdministratorAccess"), False) or next((True for p in inline_policies if not policy_doc_gives_admin_access(p.get("PolicyDocument", "{}"))), False) # Does the group have admin policies and non admin policies? if has_admin_policy and has_non_admin_policy: