Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative implementation of the policy document parser #3246

Merged
merged 7 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 131 additions & 137 deletions azurelinuxagent/ga/policy/policy_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
# Requires Python 2.4+ and Openssl 1.0+
#

import copy
import json
import re
import os
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common import logger
Expand All @@ -26,25 +26,10 @@
from azurelinuxagent.common.protocol.extensions_goal_state_from_vm_settings import _CaseFoldedDict
from azurelinuxagent.common.utils.flexible_version import FlexibleVersion


# Schema for policy file.
_POLICY_SCHEMA = \
{
"policyVersion": ustr,
"extensionPolicies": {
"allowListedExtensionsOnly": bool,
"signatureRequired": bool,
"extensions": {
"<extensionName>": {
"signatureRequired": bool
}
}
}
}

# Default policy values to be used when customer does not specify these attributes in the policy file.
_DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY = False
_DEFAULT_SIGNATURE_REQUIRED = False
_DEFAULT_EXTENSIONS = {}

# Agent supports up to this version of the policy file ("policyVersion" in schema).
# Increment this number when any new attributes are added to the policy schema.
Expand Down Expand Up @@ -72,25 +57,11 @@ class _PolicyEngine(object):
"""
def __init__(self):
# Set defaults for policy
self._policy = \
{
"policyVersion": _MAX_SUPPORTED_POLICY_VERSION,
"extensionPolicies": {
"allowListedExtensionsOnly": _DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY,
"signatureRequired": _DEFAULT_SIGNATURE_REQUIRED,
"extensions": {}
}
}

self._policy_enforcement_enabled = self.__get_policy_enforcement_enabled()
if not self.policy_enforcement_enabled:
return

# Use a copy of the policy as a template. Update the template as we parse the custom policy.
template = copy.deepcopy(self._policy)
custom_policy = self.__read_policy()
self.__parse_policy(template, custom_policy)
self._policy = template
self._policy = self._parse_policy(self.__read_policy())

@staticmethod
def _log_policy_event(msg, is_success=True, op=WALAEventOperation.Policy, send_event=True):
Expand Down Expand Up @@ -142,139 +113,162 @@ def __read_policy():
return custom_policy

@staticmethod
def __parse_policy(template, custom_policy):
def _parse_policy(policy):
"""
Update template with attributes specified in custom_policy:
- attributes provided in custom_policy override the default values in template
- if an attribute is not provided, use the default value
- if an unrecognized attribute is present in custom_policy (not defined in _POLICY_SCHEMA), raise an error
- if an attribute does not match the type specified in the schema, raise an error
Parses the given policy document and returns an equivalent document that has been populated with default values and verified for correctness, i.e.
that conforms the following schema:

{
"policyVersion": "0.1.0",
"extensionPolicies": {
"allowListedExtensionsOnly": <true, false>, [Optional; default: false]
"signatureRequired": <true, false>, [Optional; default: false]
"extensions": { [Optional; default: {} (empty)]
"<extension_name>": {
"signatureRequired": <true, false> [Optional; no default]
"runtimePolicy": <extension-specific policy> [Optional; no default]
}
},
}
}

Raises InvalidPolicyError if the policy document is invalid.
"""
# Validate top level attributes and then parse each section of the custom policy.
# Individual parsing functions are responsible for validating schema of that section (any nested dicts).
# Note that validation must happen before parsing.
_PolicyEngine.__validate_schema(custom_policy, _POLICY_SCHEMA)
_PolicyEngine.__parse_version(template, custom_policy)
_PolicyEngine.__parse_extension_policies(template, custom_policy)
if not isinstance(policy, dict):
raise InvalidPolicyError("expected an object describing a Policy; got {0}.".format(type(policy).__name__))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: policy doesn't need to be capitalized

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks; in that message, I'm trying to refer to the concept/type of "Policy", rather than a specific "policy" instance.

if that does not make sense, i can lowercase it

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if that makes sense to me, but if it does to you, you can leave it uppercase


_PolicyEngine._check_attributes(policy, object_name="policy", valid_attributes=["policyVersion", "extensionPolicies"])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we get valid_attributes from a schema constant instead of hardcoding them here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i want them to be inline so that people can scan/read the code linearly, without having to jump to the definition of a constant to check what the valid attributes are


return {
"policyVersion": _PolicyEngine._parse_policy_version(policy),
"extensionPolicies": _PolicyEngine._parse_extension_policies(policy)
}

@staticmethod
def __parse_version(template, policy):
def _parse_policy_version(policy):
"""
Validate and return "policyVersion" attribute. If not a string in the format "x.y.z", raise InvalidPolicyError.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'If not a string in the format "x.y.z"'

nit: 'If not a string in the format "major[.minor[.patch]]"'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks; updated

If policy_version is greater than maximum supported version, raise InvalidPolicyError.
"""
version = policy.get("policyVersion")
if version is None:
return
version = _PolicyEngine._get_string(policy, attribute="policyVersion")

try:
flexible_version = FlexibleVersion(version)
except ValueError:
raise InvalidPolicyError(
"invalid value for attribute 'policyVersion' attribute 'policyVersion' is expected to be in format 'major.minor.patch' "
"(e.g., '1.0.0'). Please change to a valid value.")
if not re.match(r"^\d+\.\d+\.\d+$", version):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also accept "1" or "1.0" as valid versions (I should have added a unit test for too).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the strict check since that is what the error message was specifically asking ('major.minor.patch'). Note, though, that this check should be done with a regex or similar, since FlexibleVersion accepts many, many other formats than 'major.minor.patch'.

"1", and "1.0" sound ok to me. If you want, you could add that after this change gets merged in

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add it in the next PR!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgunnala - I needed to add some unit tests to address other comments, so I went ahead and made this change and added the corresponding unit tests,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!!

raise InvalidPolicyError("invalid value for attribute 'policyVersion': '{0}'; it should be in format 'major.minor.patch' (e.g., '1.0.0')".format(version))

if FlexibleVersion(_MAX_SUPPORTED_POLICY_VERSION) < flexible_version:
raise InvalidPolicyError("policy version '{0}' is not supported. The agent supports policy versions up to '{1}'. Please provide a compatible policy version."
.format(version, _MAX_SUPPORTED_POLICY_VERSION))
if FlexibleVersion(_MAX_SUPPORTED_POLICY_VERSION) < FlexibleVersion(version):
raise InvalidPolicyError("policy version '{0}' is not supported. The agent supports policy versions up to '{1}'.".format(version, _MAX_SUPPORTED_POLICY_VERSION))

template["policyVersion"] = version
return version

@staticmethod
def __parse_extension_policies(template, policy):
extension_policies = policy.get("extensionPolicies")
if extension_policies is not None:
_PolicyEngine.__validate_schema(extension_policies, _POLICY_SCHEMA["extensionPolicies"], "extensionPolicies")

# Parse allowlist policy
allowlist_policy = extension_policies.get("allowListedExtensionsOnly")
if allowlist_policy is not None:
template["extensionPolicies"]["allowListedExtensionsOnly"] = allowlist_policy
def _parse_extension_policies(policy):
"""
Parses the "extensionPolicies" attribute of the policy document. See _parse_policy() for schema.
"""
extension_policies = _PolicyEngine._get_dictionary(policy, attribute="extensionPolicies", optional=True, default={})

# Parse global signature policy
signature_policy = extension_policies.get("signatureRequired")
if signature_policy is not None:
template["extensionPolicies"]["signatureRequired"] = signature_policy
_PolicyEngine._check_attributes(extension_policies, object_name="extensionPolicies", valid_attributes=["allowListedExtensionsOnly", "signatureRequired", "extensions"])

# Parse individual extension policies
_PolicyEngine.__parse_extensions(template, extension_policies)
return {
"allowListedExtensionsOnly": _PolicyEngine._get_boolean(extension_policies, attribute="allowListedExtensionsOnly", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY),
"signatureRequired": _PolicyEngine._get_boolean(extension_policies, attribute="signatureRequired", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_SIGNATURE_REQUIRED),
"extensions": _PolicyEngine._parse_extensions(
_PolicyEngine._get_dictionary(extension_policies, attribute="extensions", name_prefix="extensionPolicies.", optional=True, default=_CaseFoldedDict.from_dict(_DEFAULT_EXTENSIONS))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_PolicyEngine._get_dictionary(extension_policies, attribute="extensions", name_prefix="extensionPolicies.", optional=True, default=_CaseFoldedDict.from_dict(_DEFAULT_EXTENSIONS))
_PolicyEngine._get_dictionary(extension_policies, attribute="extensions", name_prefix="extensionPolicies.", optional=True, default=__DEFAULT_EXTENSIONS)

nit: we only return CaseFoldedDict in _parse_extensions, so not necessary to create case folded dict here too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point; removed

)
}

@staticmethod
def __parse_extensions(template, extensions_policy):
def _parse_extensions(extensions):
"""
Parse "extensions" dict and update in template.
"extensions" is expected to be in the format:
{
"extensions": {
"<extensionName>": {
"signatureRequired": bool
}
}
}
Parses the "extensions" attribute. See _parse_policy() for schema.
The return value is a case-folded dict. CRP allows extensions to be any case, so we allow for case-insensitive lookup of individual extension policies.
"""
parsed = _CaseFoldedDict.from_dict({})

for extension, extension_policy in extensions.items():
if not isinstance(extension_policy, dict):
raise InvalidPolicyError("invalid type {0} for attribute '{1}'; must be 'object'".format(type(extension_policy).__name__, extension))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we specify full path of the attribute in the policy doc here?
extensionPolicies.extensions.{extension}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup; added

parsed[extension] = _PolicyEngine._parse_extension(extension_policy)

If "signatureRequired" isn't provided, the global "signatureRequired" value will be used instead.
The "extensions" attribute will be converted to a case-folded dict. CRP allows extensions to be any
case, so we use case-folded dict to allow for case-insensitive lookup of individual extension policies.
return parsed

@staticmethod
def _parse_extension(extension):
"""
extensions = extensions_policy.get("extensions")
if extensions is None:
return
Parses an individual extension. See _parse_policy() for schema.
"""
extension_attribute_name = "extensionPolicies.extensions.{0}".format(extension)

_PolicyEngine._check_attributes(extension, object_name=extension_attribute_name, valid_attributes=["signatureRequired", "runtimePolicy"])

return_value = {}

signature_required = _PolicyEngine._get_boolean(extension, attribute="signatureRequired", name_prefix=extension_attribute_name, optional=True, default=None)
if signature_required is not None:
return_value["signatureRequired"] = signature_required

parsed_extensions_dict = {}
for extension_name, individual_policy in extensions.items():
# The runtimePolicy is an arbitrary object.
runtime_policy = extension.get("runtimePolicy")
if runtime_policy is not None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to the schema defined spec, runtimePolicy is an object with arbitrary attributes, so we should probably validate that the type is dict.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or if we want to accept list, string, etc., update the schema and spec accordingly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the extension policy is completely arbitrary and defined by the extension. spec needs to be updated

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a note about this to the spec

return_value["runtimePolicy"] = runtime_policy

# We don't validate "extensions" against the schema, because the attributes (individual extension names)
# are dynamic and not defined in the schema. We do validate that individual_policy is a dict, and validate
# the schema of individual_policy.
individual_policy_schema = _POLICY_SCHEMA["extensionPolicies"]["extensions"]["<extensionName>"]
if not isinstance(individual_policy, dict):
raise InvalidPolicyError("invalid type {0} for attribute '{1}', please change to object."
.format(type(individual_policy).__name__, extension_name))
_PolicyEngine.__validate_schema(individual_policy, individual_policy_schema, extension_name)
return return_value

@staticmethod
def _check_attributes(object_, object_name, valid_attributes):
"""
Check that the given object, which should be a dictionary, has only the specified attributes.
If any other attributes are present, raise InvalidPolicyError.
The object_name is used in the error message.
"""
for k in object_.keys():
if k not in valid_attributes:
raise InvalidPolicyError("invalid attribute '{0}' in {1}".format(k, object_name))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe "unrecognized" or "undefined" instead of "invalid" to make it clear that the attribute is not defined in the schema, as opposed to the type or format being invalid

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invalid: (of computer instructions, data, etc.) not conforming to the correct format or specifications

your comment made me doubt :)

but i can change it to "unknown" if it helps

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your usage isn't wrong but "invalid" sounds to me like a format/type issue. I think "unknown" is clearer, but I'm also fine with leaving it as-is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

went for "unrecognized", after all

thanks for the suggestion


extension_signature_policy = individual_policy.get("signatureRequired")
if extension_signature_policy is None:
extension_signature_policy = template["extensionPolicies"]["signatureRequired"]
@staticmethod
def _get_dictionary(object_, attribute, name_prefix="", optional=False, default=None):
"""
Returns object[attribute] if it exists, verifying that it is a dictionary, else returns default.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we update the comment to indicate this raises InvalidPolicyError if type verification fails

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, updated

If optional is False and object[attribute] does not exist, raise InvalidPolicyError.
The name_prefix indicates the path of the attribute within the policy document and is used in the error message.
"""
return _PolicyEngine._get_value(object_, attribute, name_prefix, dict, "object", optional=optional, default=default)

parsed_extensions_dict[extension_name] = \
{
"signatureRequired": extension_signature_policy
}
@staticmethod
def _get_string(object_, attribute, name_prefix="", optional=False, default=None):
"""
Returns object[attribute] if it exists, verifying that it is a string, else returns default.
maddieford marked this conversation as resolved.
Show resolved Hide resolved
If optional is False and object[attribute] does not exist, raise InvalidPolicyError.
The name_prefix indicates the path of the attribute within the policy document and is used in the error message.
"""
return _PolicyEngine._get_value(object_, attribute, name_prefix, ustr, "string", optional=optional, default=default)

# Convert "extensions" to a case-folded dict for case-insensitive lookup
case_folded_extensions_dict = _CaseFoldedDict.from_dict(parsed_extensions_dict)
template["extensionPolicies"]["extensions"] = case_folded_extensions_dict
@staticmethod
def _get_boolean(object_, attribute, name_prefix="", optional=False, default=None):
"""
Returns object[attribute] if it exists, verifying that it is a boolean, else returns default.
maddieford marked this conversation as resolved.
Show resolved Hide resolved
If optional is False and object[attribute] does not exist, raise InvalidPolicyError.
The name_prefix indicates the path of the attribute within the policy document and is used in the error message.
"""
return _PolicyEngine._get_value(object_, attribute, name_prefix, bool, "boolean", optional=optional, default=default)

@staticmethod
def __validate_schema(policy, schema, section_name=None):
def _get_value(object_, attribute, name_prefix, type_, type_name, optional, default):
"""
Validate the provided policy against the schema - we only do a shallow check (no recursion into nested dicts).
If there is an unrecognized attribute, raise an error.
Returns object[attribute] if it exists, verifying that it is of the given type_, else returns default.
maddieford marked this conversation as resolved.
Show resolved Hide resolved
If optional is False and object[attribute] does not exist, raise InvalidPolicyError.
The name_prefix indicates the path of the attribute within the policy document, the type_name indicates a user-friendly name for type_; both are used in the error message.
"""
for key, value in policy.items():
if key not in schema:
raise InvalidPolicyError("attribute '{0}' is not defined in the policy schema. Please refer to the policy documentation "
"and change or remove this attribute accordingly.".format(key))

expected_type = schema.get(key)
if isinstance(expected_type, dict):
expected_type = dict
type_in_err_msg = {
dict: "object",
ustr: "string",
bool: "boolean"
}

if not isinstance(value, expected_type):
if section_name is None:
msg = ("invalid type {0} for attribute '{1}', please change to {2}."
.format(type(value).__name__, key, type_in_err_msg.get(expected_type)))
else:
msg = ("invalid type {0} for attribute '{1}' in section '{2}', please change to {3}."
.format(type(value).__name__, key, section_name,
type_in_err_msg.get(expected_type)))

raise InvalidPolicyError(msg)
if default is not None and not optional:
raise ValueError("default value should only be provided for optional attributes")
value = object_.get(attribute)
if value is None:
if not optional:
raise InvalidPolicyError("missing required attribute '{0}{1}'".format(name_prefix, attribute))
return default
if not isinstance(value, type_):
raise InvalidPolicyError("invalid type {0} for attribute '{1}{2}'; must be '{3}'".format(type(value).__name__, name_prefix, attribute, type_name))
return value


class ExtensionPolicyEngine(_PolicyEngine):
Expand Down Expand Up @@ -311,7 +305,7 @@ def should_enforce_signature_validation(self, extension_to_check):

global_signature_required = self._policy.get("extensionPolicies").get("signatureRequired")
individual_policy = self._policy.get("extensionPolicies").get("extensions").get(extension_to_check.name)
if individual_policy is None:
if individual_policy is None or len(individual_policy) == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if individual policy has 'runtimePolicy' attribute, but not 'signatureRequired' attribute?
Then we enter else condition and return None.

I think instead we should check

if individual_policy is None or individual_policy.get('signatureRequired') is None

and add unit test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoa, good catch!

I could try to explain why I missed this, but that would sound just like an excuse for my laziness. Updated code and added unit test.

@mgunnala - I refactored some of the unit tests as part of this

return global_signature_required
else:
return individual_policy.get("signatureRequired")
Loading
Loading