1
- """ GC03 - Check trusted devices admin access
2
- Confirm that administrative access to cloud environments is from approved and trusted locations and devices
1
+ """ GC03 - Check IAM/ CloudWatch Alarms
2
+ https://canada-ca.github.io/ cloud-guardrails/EN/03_Cloud-Console-Access.html
3
3
"""
4
4
5
5
import json
6
6
import logging
7
- import ipaddress
8
- import datetime
9
7
10
8
from utils import is_scheduled_notification , check_required_parameters , check_guardrail_requirement_by_cloud_usage_profile , get_cloud_profile_from_tags , GuardrailType , GuardrailRequirementType
11
9
from boto_util .organizations import get_account_tags , get_organizations_mgmt_account_id
12
10
from boto_util .client import get_client
13
11
from boto_util .config import build_evaluation , submit_evaluations
14
- from boto_util .iam import account_has_federated_users
15
- from boto_util .s3 import check_s3_object_exists , get_lines_from_s3_file
16
- from boto_util .cloud_trail import lookup_cloud_trail_events
12
+
13
+ import botocore .exceptions
17
14
18
15
# Logging setup
19
16
logger = logging .getLogger ()
20
17
logger .setLevel (logging .INFO )
21
18
22
19
23
- def ip_is_within_ranges (ip_addr : str , ip_cidr_ranges : list [str ]) -> bool :
24
- """Return true if the given IP Address is within the at least one of the given CIDR ranges, otherwise returns false"""
25
- for ip_range in ip_cidr_ranges :
26
- ip_network = ipaddress .ip_network (ip_range )
27
- if ipaddress .ip_address (ip_addr ) in ip_network :
28
- return True
29
- return False
20
+ def check_cloudwatch_alarms (
21
+ cloudwatch_client ,
22
+ alarm_names = [
23
+ "AWS-IAM-Authentication-From-Unapproved-IP" ,
24
+ "AWS-SSO-Authentication-From-Unapproved-IP" ,
25
+ "AWS-Console-SignIn-Without-MFA" ,
26
+ "AWSAccelerator-AWS-IAM-Authentication-From-Unapproved-IP" ,
27
+ "AWSAccelerator-AWS-SSO-Authentication-From-Unapproved-IP" ,
28
+ "AWSAccelerator-AWS-Console-SignIn-Without-MFA"
29
+ ],
30
+ ):
31
+ """Check CloudWatch alarms for compliance.
32
+ Keyword arguments:
33
+ alarm_names -- the list of CloudWatch alarms to check
34
+ """
35
+ result = {"status" : "NON_COMPLIANT" , "annotation" : "No alarms found" }
36
+ if len (alarm_names ) < 1 :
37
+ # no alarms to check
38
+ result = {
39
+ "status" : "COMPLIANT" ,
40
+ "annotation" : "No alarms checked for compliance" ,
41
+ }
42
+ return result
43
+ # initialize our lists
44
+ alarms_not_found = alarm_names
45
+ alarms_found = []
46
+ try :
47
+ # describe CloudWatch alarms
48
+ response = cloudwatch_client .describe_alarms (
49
+ AlarmNames = alarm_names ,
50
+ AlarmTypes = ["MetricAlarm" ],
51
+ )
52
+ # results may be paginated, and we may have to retry
53
+ b_more_data = True
54
+ i_retries = 0
55
+ i_retry_limit = 10
56
+ next_token = ""
57
+ while b_more_data and (i_retries < i_retry_limit ):
58
+ # did we get a response?
59
+ if response :
60
+ # yes
61
+ alarms_found .extend (response .get ("MetricAlarms" ))
62
+ # results paginated?
63
+ next_token = response .get ("NextToken" )
64
+ if next_token :
65
+ # yes
66
+ response = cloudwatch_client .describe_alarms (
67
+ AlarmNames = alarm_names ,
68
+ AlarmTypes = ["MetricAlarm" ],
69
+ NextToken = next_token ,
70
+ )
71
+ else :
72
+ # no more data
73
+ b_more_data = False
74
+ else :
75
+ logger .error ("Empty response. Retry call." )
76
+ i_retries += 1
77
+ if next_token :
78
+ response = cloudwatch_client .describe_alarms (
79
+ AlarmNames = alarm_names ,
80
+ AlarmTypes = ["MetricAlarm" ],
81
+ NextToken = next_token ,
82
+ )
83
+ else :
84
+ response = cloudwatch_client .describe_alarms (
85
+ AlarmNames = alarm_names ,
86
+ AlarmTypes = ["MetricAlarm" ],
87
+ )
88
+ # did we time out trying?
89
+ if i_retries >= i_retry_limit :
90
+ # yes
91
+ result ["annotation" ] = "Empty response while trying describe_alarms in CloudWatch API."
92
+ return result
93
+ except botocore .exceptions .ClientError as error :
94
+ logger .error ("Error while trying to describe_alarms - boto3 Client error - %s" , error )
95
+ result ["annotation" ] = "Error while trying to describe_alarms."
96
+ return result
97
+
98
+ # checking the alarms we found
99
+ alarms_not_found_set = set (alarms_not_found )
100
+ for alarm in alarms_found :
101
+ if not alarms_not_found_set :
102
+ # All alarms have been found, exit the loop
103
+ break
104
+ alarm_name = alarm .get ("AlarmName" )
105
+ if alarm_name :
106
+ for not_found_alarm in alarms_not_found_set :
107
+ if not_found_alarm in alarm_name :
108
+ logger .info ("CloudWatch Alarm %s found." , alarm_name )
109
+ alarms_not_found_set .remove (not_found_alarm )
110
+
111
+ # Stop the inner loop as we found a match
112
+ break
113
+
114
+ # prepare the annotation (if needed)
115
+ if len (alarms_not_found_set ) > 0 :
116
+ annotation = "Alarms not found: "
117
+ for alarm in alarms_not_found_set :
118
+ annotation += f"{ alarm } ; "
119
+ result ["annotation" ] = annotation
120
+ else :
121
+ result = {"status" : "COMPLIANT" , "annotation" : "All alarms found" }
122
+
123
+ logger .info (result )
124
+ return result
30
125
31
126
32
127
def lambda_handler (event , context ):
@@ -47,7 +142,7 @@ def lambda_handler(event, context):
47
142
return
48
143
49
144
rule_parameters = check_required_parameters (
50
- json .loads (event .get ("ruleParameters" , "{}" )), ["ExecutionRoleName" , "s3ObjectPath " ]
145
+ json .loads (event .get ("ruleParameters" , "{}" )), ["ExecutionRoleName" , "AlarmList " ]
51
146
)
52
147
execution_role_name = rule_parameters .get ("ExecutionRoleName" )
53
148
audit_account_id = rule_parameters .get ("AuditAccountID" , "" )
@@ -56,111 +151,64 @@ def lambda_handler(event, context):
56
151
57
152
evaluations = []
58
153
59
- aws_organizations_client = get_client ("organizations" , aws_account_id , execution_role_name )
154
+ try :
155
+ client = get_client ("organizations" )
156
+ response = client .describe_account (AccountId = aws_account_id )
157
+ account_status = response ["Account" ]["Status" ]
60
158
61
- if aws_account_id != get_organizations_mgmt_account_id (aws_organizations_client ):
62
- logger .info (
63
- "Cloud Trail events not checked in account %s as this is not the Management Account" , aws_account_id
64
- )
65
- return
159
+ logger .info (f"account_status is { account_status } " )
66
160
67
- aws_config_client = get_client ("config" , aws_account_id , execution_role_name )
68
- aws_s3_client = get_client ("s3" )
69
- aws_cloudtrail_client = get_client ("cloudtrail" , aws_account_id , execution_role_name )
70
- aws_iam_client = get_client ("iam" , aws_account_id , execution_role_name )
71
-
72
- # Check cloud profile
73
- tags = get_account_tags (get_client ("organizations" , assume_role = False ), aws_account_id )
74
- cloud_profile = get_cloud_profile_from_tags (tags )
75
- gr_requirement_type = check_guardrail_requirement_by_cloud_usage_profile (GuardrailType .Guardrail3 , cloud_profile )
76
-
77
- # If the guardrail is recommended
78
- if gr_requirement_type == GuardrailRequirementType .Recommended :
79
- return submit_evaluations (aws_config_client , event , [build_evaluation (
80
- aws_account_id ,
81
- "COMPLIANT" ,
82
- event ,
83
- gr_requirement_type = gr_requirement_type
84
- )])
85
- # If the guardrail is not required
86
- elif gr_requirement_type == GuardrailRequirementType .Not_Required :
87
- return submit_evaluations (aws_config_client , event , [build_evaluation (
88
- aws_account_id ,
89
- "NOT_APPLICABLE" ,
90
- event ,
91
- gr_requirement_type = gr_requirement_type
92
- )])
161
+ if account_status != "ACTIVE" :
162
+ return
163
+
164
+ aws_organizations_client = get_client ("organizations" , aws_account_id , execution_role_name )
165
+
166
+ if aws_account_id != get_organizations_mgmt_account_id (aws_organizations_client ):
167
+ logger .info (
168
+ "CloudWatch Alarms not checked in account %s as this is not the Management Account" ,
169
+ aws_account_id ,
170
+ )
171
+ return
93
172
94
- file_param_name = "s3ObjectPath"
95
- vpn_ip_ranges_file_path = rule_parameters .get (file_param_name , "" )
96
173
97
- if not check_s3_object_exists (aws_s3_client , vpn_ip_ranges_file_path ):
98
- annotation = f"No file found for s3 path '{ vpn_ip_ranges_file_path } ' via '{ file_param_name } ' input parameter."
99
- logger .info (annotation )
100
- evaluations .append (build_evaluation (aws_account_id , "NON_COMPLIANT" , event , annotation = annotation ))
101
- submit_evaluations (aws_config_client , event , evaluations )
102
- return
174
+ aws_config_client = get_client ("config" , aws_account_id , execution_role_name )
175
+ aws_cloudwatch_client = get_client ("cloudwatch" , aws_account_id , execution_role_name )
103
176
104
- vpn_ip_ranges = get_lines_from_s3_file (aws_s3_client , vpn_ip_ranges_file_path )
105
- logger .info ("vpn_ip_ranges from the file in s3: %s" , vpn_ip_ranges )
177
+ # Check cloud profile
178
+ tags = get_account_tags (get_client ("organizations" , assume_role = False ), aws_account_id )
179
+ cloud_profile = get_cloud_profile_from_tags (tags )
180
+ gr_requirement_type = check_guardrail_requirement_by_cloud_usage_profile (GuardrailType .Guardrail3 , cloud_profile )
181
+
182
+ # If the guardrail is recommended
183
+ if gr_requirement_type == GuardrailRequirementType .Recommended :
184
+ return submit_evaluations (aws_config_client , event , [build_evaluation (
185
+ aws_account_id ,
186
+ "COMPLIANT" ,
187
+ event ,
188
+ gr_requirement_type = gr_requirement_type
189
+ )])
190
+ # If the guardrail is not required
191
+ elif gr_requirement_type == GuardrailRequirementType .Not_Required :
192
+ return submit_evaluations (aws_config_client , event , [build_evaluation (
193
+ aws_account_id ,
194
+ "NOT_APPLICABLE" ,
195
+ event ,
196
+ gr_requirement_type = gr_requirement_type
197
+ )])
198
+
199
+ results = check_cloudwatch_alarms (
200
+ aws_cloudwatch_client , alarm_names = str (rule_parameters ["AlarmList" ]).split ("," )
201
+ )
202
+ if results :
203
+ compliance_type = results .get ("status" )
204
+ annotation = results .get ("annotation" )
205
+ else :
206
+ compliance_type = "NON_COMPLIANT"
207
+ annotation = "Unable to assess CloudWatch Alarms"
106
208
107
- if not vpn_ip_ranges :
108
- annotation = "No ip ranges found in input file."
109
- logger .info (annotation )
110
- evaluations .append (build_evaluation (aws_account_id , "NON_COMPLIANT" , event , annotation = annotation ))
209
+ logger .info (f"{ compliance_type } : { annotation } " )
210
+ evaluations .append (build_evaluation (aws_account_id , compliance_type , event , annotation = annotation ))
111
211
submit_evaluations (aws_config_client , event , evaluations )
112
- return
113
-
114
- bg_account_names = [rule_parameters ["BgUser1" ], rule_parameters ["BgUser2" ]]
115
- lookup_attributes = [{"AttributeKey" : "EventName" , "AttributeValue" : "ConsoleLogin" }]
116
- console_login_cloud_trail_events = lookup_cloud_trail_events (aws_cloudtrail_client , lookup_attributes )
117
- print ("INFO 1:" , console_login_cloud_trail_events )
118
- cloud_trail_events = [e for e in console_login_cloud_trail_events if e .get ("Username" ) not in bg_account_names ]
119
- num_compliant_rules = 0
120
- logger .info ("Number of events found: %s" , len (cloud_trail_events ))
121
-
122
- for lookup_event in cloud_trail_events :
123
- ct_event = json .loads (lookup_event .get ("CloudTrailEvent" , "{}" ))
124
- ct_event_id = ct_event .get ("eventID" , "" )
125
- ct_event_time = ct_event .get ("eventTime" , "" )
126
-
127
- # get event time and convert to ISO format
128
- ct_event_isotime = datetime .datetime .fromisoformat (ct_event_time .replace ('Z' , '+00:00' ))
129
- now = datetime .datetime .now (datetime .timezone .utc )
130
- # find difference from the current/execution time
131
- time_diff = now - ct_event_isotime
132
- # this is one day in ISO format
133
- one_day = datetime .timedelta (days = 1 )
134
-
135
- if time_diff > one_day :
136
- # skip if event entry is older than 1 day
137
- pass
138
- elif not ip_is_within_ranges (ct_event ["sourceIPAddress" ], vpn_ip_ranges ):
139
- compliance_type = "NON_COMPLIANT"
140
- annotation = f"Cloud Trail Event '{ ct_event_id } ' has a source IP address OUTSIDE of the allowed ranges, with event time '{ ct_event_time } '."
141
- logger .info (f"{ compliance_type } : { annotation } " )
142
- evaluations .append (build_evaluation (ct_event_id , compliance_type , event , "AWS::CloudTrail::Trail" , annotation ))
143
- else :
144
- num_compliant_rules = num_compliant_rules + 1
145
- compliance_type = "COMPLIANT"
146
- annotation = f"Cloud Trail Event '{ ct_event_id } ' has a source IP address inside of the allowed ranges."
147
- if account_has_federated_users (aws_iam_client ):
148
- annotation = f"{ annotation } Dependent on the compliance of the Federated IdP."
149
- logger .info (f"{ compliance_type } : { annotation } " )
150
- evaluations .append (build_evaluation (ct_event_id , compliance_type , event , "AWS::CloudTrail::Trail" , annotation ))
151
-
152
- #logger.info(f"{compliance_type}: {annotation}")
153
- #evaluations.append(build_evaluation(ct_event_id, compliance_type, event, "AWS::CloudTrail::Trail", annotation))
154
-
155
- if len (cloud_trail_events ) == num_compliant_rules :
156
- compliance_type = "COMPLIANT"
157
- annotation = "All Cloud Trail Events are within the allowed source IP address ranges."
158
- if account_has_federated_users (aws_iam_client ):
159
- annotation = f"All Cloud Trail Events are within the allowed source IP address ranges or are dependant on the federated identity provider."
160
- else :
161
- compliance_type = "NON_COMPLIANT"
162
- annotation = "NOT all Cloud Trail Events are within the allowed source IP address ranges."
163
212
164
- logger .info (f"{ compliance_type } : { annotation } " )
165
- evaluations .append (build_evaluation (aws_account_id , compliance_type , event , annotation = annotation ))
166
- submit_evaluations (aws_config_client , event , evaluations )
213
+ except :
214
+ logger .info ("This account Id is not active. Compliance evaluation not available for suspended accounts" )
0 commit comments