Skip to content

Commit

Permalink
indentation
Browse files Browse the repository at this point in the history
  • Loading branch information
EC2 Default User committed Feb 20, 2025
1 parent 2cd5b1c commit 3348563
Showing 1 changed file with 76 additions and 94 deletions.
Original file line number Diff line number Diff line change
@@ -1,95 +1,77 @@
import logging

import boto3
import botocore.exceptions

logger = logging.getLogger()
logger.setLevel(logging.INFO)


# This gets the client after assuming the Config service role
# either in the same AWS account or cross-account.
def get_clientt(
service: str,
account_id: str | None = None,
role_name: str | None = None,
assume_role: bool = True,
region: str | None = None,
endpoint_url: str | None = None,

):
"""
Return the service boto client. It should be used instead of directly calling the client.
This gets the client after assuming the Config service role for the provided account.
If no account_id or role_name is provided, the client is configured for the current credentials and account.
Keyword arguments:
service -- the service name used for calling the boto.client(service)
account_id -- the id of the account for the assumed role
role_name -- the name of the role to assume when creating the client
"""
if not role_name or not account_id or not assume_role:
return boto3.client(service,endpoint_url=endpoint_url)

credentials = get_assume_role_credentials(f"arn:aws:iam::{account_id}:role/{role_name}", region)
return boto3.client(
service,
endpoint_url=endpoint_url,
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
)


def get_assume_role_credentials(role_arn: str, region: str = None) -> dict:
"""
Returns the credentials required to assume the passed role.
Keyword arguments:
role_arn -- the arn of the role to assume
"""
sts_client = boto3.client("sts", region_name=region) if region else boto3.client("sts")
try:
assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution")
return assume_role_response["Credentials"]
except botocore.exceptions.ClientError as ex:
# Scrub error message for any internal account info leaks
if "AccessDenied" in ex.response["Error"]["Code"]:
ex.response["Error"]["Message"] = "AWS Config does not have permission to assume the IAM role."
else:
ex.response["Error"]["Message"] = "InternalError"
ex.response["Error"]["Code"] = "InternalError"
logger.error("ERROR assuming role. %s", ex.response["Error"])
raise ex


def is_throttling_exception(e):
"""Returns True if the exception code is one of the throttling exception codes we have"""
b_is_throttling = False
throttling_exception_codes = [
"ConcurrentModificationException",
"InsufficientDeliveryPolicyException",
"NoAvailableDeliveryChannelException",
"ConcurrentModifications",
"LimitExceededException",
"OperationNotPermittedException",
"TooManyRequestsException",
"Throttling",
"ThrottlingException",
"InternalErrorException",
"InternalException",
"ECONNRESET",
"EPIPE",
"ETIMEDOUT",
]

for throttling_code in throttling_exception_codes:
if throttling_code in e.response["Error"]["Code"]:
b_is_throttling = True
break

return b_is_throttling
import boto3
import botocore.exceptions
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# This gets the client after assuming the Config service role
# either in the same AWS account or cross-account.
def get_clientt(
service: str,
account_id: str | None = None,
role_name: str | None = None,
assume_role: bool = True,
region: str | None = None,
endpoint_url: str | None = None,
):
"""
Return the service boto client. It should be used instead of directly calling the client.
This gets the client after assuming the Config service role for the provided account.
If no account_id or role_name is provided, the client is configured for the current credentials and account.
Keyword arguments:
service -- the service name used for calling the boto.client(service)
account_id -- the id of the account for the assumed role
role_name -- the name of the role to assume when creating the client
"""
if not role_name or not account_id or not assume_role:
return boto3.client(service,endpoint_url=endpoint_url)
credentials = get_assume_role_credentials(f"arn:aws:iam::{account_id}:role/{role_name}", region)
return boto3.client(
service,
endpoint_url=endpoint_url,
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
)
def get_assume_role_credentials(role_arn: str, region: str = None) -> dict:
"""
Returns the credentials required to assume the passed role.
Keyword arguments:
role_arn -- the arn of the role to assume
"""
sts_client = boto3.client("sts", region_name=region) if region else boto3.client("sts")
try:
assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution")
return assume_role_response["Credentials"]
except botocore.exceptions.ClientError as ex:
# Scrub error message for any internal account info leaks
if "AccessDenied" in ex.response["Error"]["Code"]:
ex.response["Error"]["Message"] = "AWS Config does not have permission to assume the IAM role."
else:
ex.response["Error"]["Message"] = "InternalError"
ex.response["Error"]["Code"] = "InternalError"
logger.error("ERROR assuming role. %s", ex.response["Error"])
raise ex
def is_throttling_exception(e):
"""Returns True if the exception code is one of the throttling exception codes we have"""
b_is_throttling = False
throttling_exception_codes = [
"ConcurrentModificationException",
"InsufficientDeliveryPolicyException",
"NoAvailableDeliveryChannelException",
"ConcurrentModifications",
"LimitExceededException",
"OperationNotPermittedException",
"TooManyRequestsException",
"Throttling",
"ThrottlingException",
"InternalErrorException",
"InternalException",
"ECONNRESET",
"EPIPE",
"ETIMEDOUT",
]
for throttling_code in throttling_exception_codes:
if throttling_code in e.response["Error"]["Code"]:
b_is_throttling = True
break
return b_is_throttling

0 comments on commit 3348563

Please sign in to comment.