Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ omit =
# Schema tested by `make check-black`
samtranslator/schema/*
samtranslator/internal/schema_source/*
# Deprecated validator module
samtranslator/validator/validator.py
[report]
exclude_lines =
pragma: no cover
157 changes: 137 additions & 20 deletions samtranslator/model/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" CloudFormation Resource serialization, deserialization, and validation """
"""CloudFormation Resource serialization, deserialization, and validation"""

import inspect
import re
Expand All @@ -15,6 +15,7 @@
from samtranslator.model.tags.resource_tagging import get_tag_list
from samtranslator.model.types import IS_DICT, IS_STR, PassThrough, Validator, any_type, is_type
from samtranslator.plugins import LifeCycleEvents
from samtranslator.validator.property_rule import PropertyRules

RT = TypeVar("RT", bound=pydantic.BaseModel) # return type

Expand Down Expand Up @@ -163,7 +164,7 @@ def __init__(
self.set_resource_attribute(attr, value)

@classmethod
def get_supported_resource_attributes(cls): # type: ignore[no-untyped-def]
def get_supported_resource_attributes(cls) -> Tuple[str, ...]:
"""
A getter method for the supported resource attributes
returns: a tuple that contains the name of all supported resource attributes
Expand Down Expand Up @@ -205,7 +206,7 @@ def from_dict(cls, logical_id: str, resource_dict: Dict[str, Any], relative_id:

resource = cls(logical_id, relative_id=relative_id)

resource._validate_resource_dict(logical_id, resource_dict) # type: ignore[no-untyped-call]
resource._validate_resource_dict(logical_id, resource_dict)

# Default to empty properties dictionary. If customers skip the Properties section, an empty dictionary
# accurately captures the intent.
Expand Down Expand Up @@ -247,7 +248,7 @@ def _validate_logical_id(logical_id: Optional[Any]) -> str:
raise InvalidResourceException(str(logical_id), "Logical ids must be alphanumeric.")

@classmethod
def _validate_resource_dict(cls, logical_id, resource_dict): # type: ignore[no-untyped-def]
def _validate_resource_dict(cls, logical_id: str, resource_dict: Dict[str, Any]) -> None:
"""Validates that the provided resource dict contains the correct Type string, and the required Properties dict.

:param dict resource_dict: the resource dict to validate
Expand Down Expand Up @@ -335,22 +336,85 @@ def __setattr__(self, name, value): # type: ignore[no-untyped-def]
)

# Note: For compabitliy issue, we should ONLY use this with new abstraction/resources.
def validate_properties_and_return_model(self, cls: Type[RT]) -> RT:
def validate_properties_and_return_model(self, cls: Type[RT], collect_all_errors: bool = False) -> RT:
"""
Given a resource properties, return a typed object from the definitions of SAM schema model

param:
resource_properties: properties from input template
Args:
cls: schema models
collect_all_errors: If True, collect all validation errors. If False (default), only first error.
"""
try:
return cls.parse_obj(self._generate_resource_dict()["Properties"])
except pydantic.error_wrappers.ValidationError as e:
if collect_all_errors:
# Comprehensive error collection with union type consolidation
error_messages = self._format_all_errors(e.errors()) # type: ignore[arg-type]
raise InvalidResourceException(self.logical_id, " ".join(error_messages)) from e
error_properties: str = ""
with suppress(KeyError):
error_properties = ".".join(str(x) for x in e.errors()[0]["loc"])
raise InvalidResourceException(self.logical_id, f"Property '{error_properties}' is invalid.") from e

def _format_all_errors(self, errors: List[Dict[str, Any]]) -> List[str]:
"""Format all validation errors, consolidating union type errors in single pass."""
type_mapping = {
"not a valid dict": "dictionary",
"not a valid int": "integer",
"not a valid float": "number",
"not a valid list": "list",
"not a valid str": "string",
}

# Group errors by path in a single pass
path_to_errors: Dict[str, Dict[str, Any]] = {}

for error in errors:
property_path = ".".join(str(x) for x in error["loc"])
raw_message = error.get("msg", "")

# Extract type for union consolidation
extracted_type = None
for pattern, type_name in type_mapping.items():
if pattern in raw_message:
extracted_type = type_name
break

if property_path not in path_to_errors:
path_to_errors[property_path] = {"types": [], "error": error}

if extracted_type:
path_to_errors[property_path]["types"].append(extracted_type)

# Format messages based on collected data
result = []
for path, data in path_to_errors.items():
unique_types = list(dict.fromkeys(data["types"])) # Remove duplicates, preserve order

if len(unique_types) > 1:
# Multiple types - consolidate with union
type_text = " or ".join(unique_types)
result.append(f"Property '{path}' value must be {type_text}.")
elif len(unique_types) == 1:
# Single type - use type mapping
result.append(f"Property '{path}' value must be {unique_types[0]}.")
else:
# No types - format normally
result.append(self._format_single_error(data["error"]))

return result

def _format_single_error(self, error: Dict[str, Any]) -> str:
"""Format a single Pydantic error into user-friendly message."""
property_path = ".".join(str(x) for x in error["loc"])
raw_message = error["msg"]

if error["type"] == "value_error.missing":
return f"Property '{property_path}' is required."
if "extra fields not permitted" in raw_message:
return f"Property '{property_path}' is an invalid property."
return f"Property '{property_path}' {raw_message.lower()}."

def validate_properties(self) -> None:
"""Validates that the required properties for this Resource have been populated, and that all properties have
valid values.
Expand Down Expand Up @@ -508,6 +572,10 @@ class SamResourceMacro(ResourceMacro, metaclass=ABCMeta):
# Aggregate list of all reserved tags
_RESERVED_TAGS = [_SAM_KEY, _SAR_APP_KEY, _SAR_SEMVER_KEY]

def get_property_validation_rules(self) -> Optional[PropertyRules]:
"""Override this method in child classes to provide PropertyRules validation."""
return None

def get_resource_references(self, generated_cfn_resources, supported_resource_refs): # type: ignore[no-untyped-def]
"""
Constructs the list of supported resource references by going through the list of CFN resources generated
Expand Down Expand Up @@ -536,14 +604,14 @@ def get_resource_references(self, generated_cfn_resources, supported_resource_re
def _construct_tag_list(
self, tags: Optional[Dict[str, Any]], additional_tags: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
if not bool(tags):
tags = {}
tags_dict: Dict[str, Any] = tags or {}

if additional_tags is None:
additional_tags = {}

# At this point tags is guaranteed to be a Dict[str, Any] since we set it to {} if it was falsy
for tag in self._RESERVED_TAGS:
self._check_tag(tag, tags) # type: ignore[no-untyped-call]
self._check_tag(tag, tags_dict)

sam_tag = {self._SAM_KEY: self._SAM_VALUE}

Expand All @@ -553,6 +621,40 @@ def _construct_tag_list(
# customer's knowledge.
return get_tag_list(sam_tag) + get_tag_list(additional_tags) + get_tag_list(tags)

@staticmethod
def propagate_tags_combine(
resources: List[Resource], tags: Optional[Dict[str, Any]], propagate_tags: Optional[bool] = False
) -> None:
"""
Propagates tags to the resources
Similar to propagate_tags() method but this method will combine provided tags with existing resource tags.

Note:
- This method created after propagate_tags() to combine propagate tags with resource tags create during to_cloudformation()
- Create this method because updating propagate_tags() will cause regression issue;
- Use this method for new resource if you want to assign combined tags, not replace.

:param propagate_tags: Whether we should pass the tags to generated resources.
:param resources: List of generated resources
:param tags: dictionary of tags to propagate to the resources.

:return: None
"""
if not propagate_tags or not tags:
return

for resource in resources:
if hasattr(resource, "Tags"):
if resource.Tags:
propagated_tags = get_tag_list(tags)
combined_tags = [
{"Key": k, "Value": v}
for k, v in {tag["Key"]: tag["Value"] for tag in resource.Tags + propagated_tags}.items()
]
resource.Tags = combined_tags
else:
resource.assign_tags(tags)

@staticmethod
def propagate_tags(
resources: List[Resource], tags: Optional[Dict[str, Any]], propagate_tags: Optional[bool] = False
Expand All @@ -572,7 +674,7 @@ def propagate_tags(
for resource in resources:
resource.assign_tags(tags)

def _check_tag(self, reserved_tag_name, tags): # type: ignore[no-untyped-def]
def _check_tag(self, reserved_tag_name: str, tags: Dict[str, Any]) -> None:
if reserved_tag_name in tags:
raise InvalidResourceException(
self.logical_id,
Expand All @@ -582,6 +684,21 @@ def _check_tag(self, reserved_tag_name, tags): # type: ignore[no-untyped-def]
"input.",
)

def validate_before_transform(self, schema_class: Type[RT]) -> None:
rules = self.get_property_validation_rules()
if rules is None:
return

# Validate properties and get model, then pass to rules
try:
validated_model = self.validate_properties_and_return_model(schema_class)
except Exception:
validated_model = None

error_messages = rules.validate_all(validated_model)
if error_messages:
raise InvalidResourceException(self.logical_id, "\n".join(error_messages))


class ResourceTypeResolver:
"""ResourceTypeResolver maps Resource Types to Resource classes, e.g. AWS::Serverless::Function to
Expand Down Expand Up @@ -643,7 +760,7 @@ def get_all_resources(self) -> Dict[str, Any]:
"""Return a dictionary of all resources from the SAM template."""
return self.resources

def get_resource_by_logical_id(self, _input: str) -> Dict[str, Any]:
def get_resource_by_logical_id(self, _input: str) -> Optional[Dict[str, Any]]:
"""
Recursively find resource with matching Logical ID that are present in the template and returns the value.
If it is not in template, this method simply returns the input unchanged.
Expand All @@ -661,16 +778,16 @@ def get_resource_by_logical_id(self, _input: str) -> Dict[str, Any]:
__all__: List[str] = [
"IS_DICT",
"IS_STR",
"Validator",
"any_type",
"is_type",
"PropertyType",
"Property",
"PassThroughProperty",
"MutatedPassThroughProperty",
"PassThroughProperty",
"Property",
"PropertyType",
"Resource",
"ResourceMacro",
"SamResourceMacro",
"ResourceTypeResolver",
"ResourceResolver",
"ResourceTypeResolver",
"SamResourceMacro",
"Validator",
"any_type",
"is_type",
]
24 changes: 23 additions & 1 deletion samtranslator/model/sam_resources.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" SAM macro definitions """
"""SAM macro definitions"""

import copy
import re
Expand Down Expand Up @@ -139,6 +139,7 @@ class SamFunction(SamResourceMacro):
"""SAM function macro."""

resource_type = "AWS::Serverless::Function"

property_types = {
"FunctionName": PropertyType(False, one_of(IS_STR, IS_DICT)),
"Handler": PassThroughProperty(False),
Expand Down Expand Up @@ -252,6 +253,21 @@ class SamFunction(SamResourceMacro):
"DestinationQueue": SQSQueue.resource_type,
}

# def get_property_validation_rules(self) -> Optional[PropertyRules]:
# """Override to provide PropertyRules validation for SAM Function."""
# TODO: To enable these rules, we need to update translator test input/output files to property configure template
# to avoid fail-fast. eg: test with DeploymentPreference without AutoPublishAlias would fail fast before reaching testing state
# from samtranslator.internal.schema_source.aws_serverless_function import Properties as FunctionProperties
# return (PropertyRules(FunctionProperties)
# .addMutuallyExclusive("ImageUri", "InlineCode", "CodeUri")
# .addConditionalInclusive("DeploymentPreference", ["AutoPublishAlias"])
# .addConditionalInclusive("ProvisionedConcurrencyConfig", ["AutoPublishAlias"])
# .addConditionalInclusive("PackageType=Zip", ["Runtime", "Handler"])
# .addConditionalInclusive("PackageType=Image", ["ImageUri"])
# .addConditionalExclusive("PackageType=Zip", ["ImageUri", "ImageConfig"])
# .addConditionalExclusive("PackageType=Image", ["Runtime", "Handler", "Layers"]))
# return None

Comment on lines +256 to +270
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to leave this here, commented out?

def resources_to_link(self, resources: Dict[str, Any]) -> Dict[str, Any]:
try:
return {"event_resources": self._event_resources_to_link(resources)}
Expand All @@ -274,6 +290,11 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P
conditions = kwargs.get("conditions", {})
feature_toggle = kwargs.get("feature_toggle")

# TODO: Skip pass schema_class=aws_serverless_function.Properties to skip schema validation for now.
# - adding this now would required update error message in error error_function_*_test.py
# - add this when we can verify that changing error message would not break customers
# self.validate_before_transform(schema_class=aws_serverless_function.Properties)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this validate_before_transform that appears here? Is it defined somewhere else?


if self.DeadLetterQueue:
self._validate_dlq(self.DeadLetterQueue)

Expand Down Expand Up @@ -1857,6 +1878,7 @@ def _validate_architectures(self, lambda_layer: LambdaLayerVersion) -> None:
# Intrinsics are not validated
if is_intrinsic(architectures):
return

for arq in architectures:
# We validate the values only if we they're not intrinsics
if not is_intrinsic(arq) and arq not in [ARM64, X86_64]:
Expand Down
Loading