diff --git a/examples/policy_set.py b/examples/policy_set.py new file mode 100644 index 0000000..f6da973 --- /dev/null +++ b/examples/policy_set.py @@ -0,0 +1,446 @@ +from __future__ import annotations + +import argparse +import os + +from pytfe import TFEClient, TFEConfig +from pytfe.models import ( + Policy, + PolicyKind, + PolicySetAddPoliciesOptions, + PolicySetAddProjectsOptions, + PolicySetAddWorkspacesOptions, + PolicySetCreateOptions, + PolicySetListOptions, + PolicySetRemovePoliciesOptions, + PolicySetRemoveProjectsOptions, + PolicySetRemoveWorkspacesOptions, + PolicySetUpdateOptions, + Project, + Workspace, +) + + +def _print_header(title: str): + print("\n" + "=" * 80) + print(title) + print("=" * 80) + + +def _print_policy_set_info(ps): + """Helper function to print policy set information.""" + print(f"ID: {ps.id}") + print(f"Name: {ps.name}") + print(f"Description: {ps.description}") + print(f"Kind: {ps.kind}") + print(f"Global: {ps.Global}") + print(f"Overridable: {ps.overridable}") + print(f"Agent Enabled: {ps.agent_enabled}") + print(f"Policy Tool Version: {ps.policy_tool_version}") + print(f"Policies Path: {ps.policies_path}") + print(f"Policy Count: {ps.policy_count}") + print(f"Workspace Count: {ps.workspace_count}") + print(f"Project Count: {ps.project_count}") + print(f"Created At: {ps.created_at}") + print(f"Updated At: {ps.updated_at}") + + if ps.vcs_repo: + print(f"VCS Repo: {ps.vcs_repo.identifier}") + + if ps.workspaces: + print( + f"Workspaces: {[w.id if hasattr(w, 'id') else str(w) for w in ps.workspaces]}" + ) + + if ps.projects: + print( + f"Projects: {[p.id if hasattr(p, 'id') else str(p) for p in ps.projects]}" + ) + + if ps.policies: + print( + f"Policies: {[p.id if hasattr(p, 'id') else str(p) for p in ps.policies]}" + ) + + if ps.workspace_exclusions: + print( + f"Workspace Exclusions: {[w.id if hasattr(w, 'id') else str(w) for w in ps.workspace_exclusions]}" + ) + + +def main(): + parser = argparse.ArgumentParser(description="Policy Sets demo for python-tfe SDK") + parser.add_argument( + "--address", default=os.getenv("TFE_ADDRESS", "https://app.terraform.io") + ) + parser.add_argument("--token", default=os.getenv("TFE_TOKEN", "")) + parser.add_argument("--org", required=True, help="Organization name") + parser.add_argument( + "--policy-set-id", help="Policy Set ID for read/update/delete operations" + ) + parser.add_argument( + "--policy-set-name", help="Policy Set name for create operation" + ) + parser.add_argument( + "--description", help="Policy Set description for create operation" + ) + parser.add_argument( + "--kind", choices=["opa", "sentinel"], help="Policy Set kind (opa or sentinel)" + ) + parser.add_argument("--global", action="store_true", help="Make policy set global") + parser.add_argument( + "--overridable", action="store_true", help="Make policy set overridable" + ) + parser.add_argument( + "--agent-enabled", action="store_true", help="Enable agent for policy set" + ) + parser.add_argument( + "--workspace-ids", nargs="+", help="Workspace IDs to associate with policy set" + ) + parser.add_argument( + "--project-ids", nargs="+", help="Project IDs to associate with policy set" + ) + parser.add_argument( + "--policy-ids", nargs="+", help="Policy IDs to associate with policy set" + ) + parser.add_argument("--create", action="store_true", help="Create a new policy set") + parser.add_argument("--read", action="store_true", help="Read a policy set") + parser.add_argument("--update", action="store_true", help="Update a policy set") + parser.add_argument("--delete", action="store_true", help="Delete a policy set") + parser.add_argument( + "--add-workspaces", action="store_true", help="Add workspaces to policy set" + ) + parser.add_argument( + "--remove-workspaces", + action="store_true", + help="Remove workspaces from policy set", + ) + parser.add_argument( + "--add-projects", action="store_true", help="Add projects to policy set" + ) + parser.add_argument( + "--remove-projects", action="store_true", help="Remove projects from policy set" + ) + parser.add_argument( + "--add-policies", action="store_true", help="Add policies to policy set" + ) + parser.add_argument( + "--remove-policies", action="store_true", help="Remove policies from policy set" + ) + parser.add_argument("--search", help="Search policy sets by name") + parser.add_argument("--page", type=int, default=1) + parser.add_argument("--page-size", type=int, default=20) + args = parser.parse_args() + + if not args.token: + print("Error: TFE_TOKEN environment variable or --token argument is required") + return + + cfg = TFEConfig(address=args.address, token=args.token) + client = TFEClient(cfg) + + # 1) List all policy sets for the organization + _print_header(f"Listing policy sets for organization: {args.org}") + + list_options = PolicySetListOptions( + page_number=args.page, + page_size=args.page_size, + ) + + # Only add includes if we want to test them + # list_options.include = [ + # PolicySetIncludeOpt.POLICY_SET_WORKSPACES, + # PolicySetIncludeOpt.POLICY_SET_PROJECTS, + # PolicySetIncludeOpt.POLICY_SET_POLICIES, + # ] + + if args.search: + list_options.search = args.search + + if args.kind: + list_options.kind = ( + PolicyKind.OPA if args.kind == "opa" else PolicyKind.SENTINEL + ) + + try: + ps_list = client.policy_sets.list(args.org, list_options) + + print(f"Total policy sets: {ps_list.total_count}") + print(f"Page {ps_list.current_page} of {ps_list.total_pages}") + print() + + if not ps_list.items: + print("No policy sets found for this organization.") + else: + for ps in ps_list.items: + print( + f"- ID: {ps.id} | Name: {ps.name} | Kind: {ps.kind} | Global: {ps.Global}" + ) + print( + f" Policy Count: {ps.policy_count} | Workspace Count: {ps.workspace_count}" + ) + print(f" Created: {ps.created_at}") + print() + + except Exception as e: + print(f"Error listing policy sets: {e}") + return + + # 2) Create a new policy set + if args.create: + if not args.policy_set_name: + print("Error: --policy-set-name is required for create operation") + return + + _print_header(f"Creating policy set: {args.policy_set_name}") + + try: + create_options = PolicySetCreateOptions( + name=args.policy_set_name, + description=args.description, + Global=args.global_policy if hasattr(args, "global_policy") else False, + overridable=args.overridable, + agent_enabled=args.agent_enabled, + ) + + if args.kind: + create_options.kind = ( + PolicyKind.OPA if args.kind == "opa" else PolicyKind.SENTINEL + ) + + # Add workspaces if provided + if args.workspace_ids: + create_options.workspaces = [ + Workspace(id=ws_id) for ws_id in args.workspace_ids + ] + + # Add projects if provided + if args.project_ids: + create_options.projects = [ + Project(id=proj_id) for proj_id in args.project_ids + ] + + # Add policies if provided + if args.policy_ids: + create_options.policies = [ + Policy(id=pol_id) for pol_id in args.policy_ids + ] + + new_ps = client.policy_sets.create(args.org, create_options) + print("Successfully created policy set!") + _print_policy_set_info(new_ps) + + except Exception as e: + print(f"Error creating policy set: {e}") + + # 3) Read a specific policy set + if args.read: + if not args.policy_set_id: + print("Error: --policy-set-id is required for read operation") + return + + _print_header(f"Reading policy set: {args.policy_set_id}") + + try: + # read_options = PolicySetReadOptions( + # include=[ + # PolicySetIncludeOpt.POLICY_SET_WORKSPACES, + # PolicySetIncludeOpt.POLICY_SET_PROJECTS, + # PolicySetIncludeOpt.POLICY_SET_POLICIES, + # PolicySetIncludeOpt.POLICY_SET_WORKSPACE_EXCLUSIONS, + # ] + # ) + + ps = client.policy_sets.read(args.policy_set_id) + _print_policy_set_info(ps) + + except Exception as e: + print(f"Error reading policy set: {e}") + + # 4) Update a policy set + if args.update: + if not args.policy_set_id: + print("Error: --policy-set-id is required for update operation") + return + + _print_header(f"Updating policy set: {args.policy_set_id}") + + try: + update_options = PolicySetUpdateOptions() + + if args.policy_set_name: + update_options.name = args.policy_set_name + if args.description: + update_options.description = args.description + if hasattr(args, "global_policy"): + update_options.Global = args.global_policy + if args.overridable: + update_options.overridable = args.overridable + if args.agent_enabled: + update_options.agent_enabled = args.agent_enabled + + updated_ps = client.policy_sets.update(args.policy_set_id, update_options) + print("Successfully updated policy set!") + _print_policy_set_info(updated_ps) + + except Exception as e: + print(f"Error updating policy set: {e}") + + # 5) Add workspaces to policy set + if args.add_workspaces: + if not args.policy_set_id or not args.workspace_ids: + print( + "Error: --policy-set-id and --workspace-ids are required for add-workspaces operation" + ) + return + + _print_header(f"Adding workspaces to policy set: {args.policy_set_id}") + + try: + add_ws_options = PolicySetAddWorkspacesOptions( + workspaces=[Workspace(id=ws_id) for ws_id in args.workspace_ids] + ) + + client.policy_sets.add_workspaces(args.policy_set_id, add_ws_options) + print( + f"Successfully added {len(args.workspace_ids)} workspaces to policy set!" + ) + + except Exception as e: + print(f"Error adding workspaces: {e}") + + # 6) Remove workspaces from policy set + if args.remove_workspaces: + if not args.policy_set_id or not args.workspace_ids: + print( + "Error: --policy-set-id and --workspace-ids are required for remove-workspaces operation" + ) + return + + _print_header(f"Removing workspaces from policy set: {args.policy_set_id}") + + try: + remove_ws_options = PolicySetRemoveWorkspacesOptions( + workspaces=[Workspace(id=ws_id) for ws_id in args.workspace_ids] + ) + + client.policy_sets.remove_workspaces(args.policy_set_id, remove_ws_options) + print( + f"Successfully removed {len(args.workspace_ids)} workspaces from policy set!" + ) + + except Exception as e: + print(f"Error removing workspaces: {e}") + + # 7) Add projects to policy set + if args.add_projects: + if not args.policy_set_id or not args.project_ids: + print( + "Error: --policy-set-id and --project-ids are required for add-projects operation" + ) + return + + _print_header(f"Adding projects to policy set: {args.policy_set_id}") + + try: + add_proj_options = PolicySetAddProjectsOptions( + projects=[Project(id=proj_id) for proj_id in args.project_ids] + ) + + client.policy_sets.add_projects(args.policy_set_id, add_proj_options) + print(f"Successfully added {len(args.project_ids)} projects to policy set!") + + except Exception as e: + print(f"Error adding projects: {e}") + + # 8) Remove projects from policy set + if args.remove_projects: + if not args.policy_set_id or not args.project_ids: + print( + "Error: --policy-set-id and --project-ids are required for remove-projects operation" + ) + return + + _print_header(f"Removing projects from policy set: {args.policy_set_id}") + + try: + remove_proj_options = PolicySetRemoveProjectsOptions( + projects=[Project(id=proj_id) for proj_id in args.project_ids] + ) + + client.policy_sets.remove_projects(args.policy_set_id, remove_proj_options) + print( + f"Successfully removed {len(args.project_ids)} projects from policy set!" + ) + + except Exception as e: + print(f"Error removing projects: {e}") + + # 9) Add policies to policy set + if args.add_policies: + if not args.policy_set_id or not args.policy_ids: + print( + "Error: --policy-set-id and --policy-ids are required for add-policies operation" + ) + return + + _print_header(f"Adding policies to policy set: {args.policy_set_id}") + + try: + add_pol_options = PolicySetAddPoliciesOptions( + policies=[Policy(id=pol_id) for pol_id in args.policy_ids] + ) + + client.policy_sets.add_policies(args.policy_set_id, add_pol_options) + print(f"Successfully added {len(args.policy_ids)} policies to policy set!") + + except Exception as e: + print(f"Error adding policies: {e}") + + # 10) Remove policies from policy set + if args.remove_policies: + if not args.policy_set_id or not args.policy_ids: + print( + "Error: --policy-set-id and --policy-ids are required for remove-policies operation" + ) + return + + _print_header(f"Removing policies from policy set: {args.policy_set_id}") + + try: + remove_pol_options = PolicySetRemovePoliciesOptions( + policies=[Policy(id=pol_id) for pol_id in args.policy_ids] + ) + + client.policy_sets.remove_policies(args.policy_set_id, remove_pol_options) + print( + f"Successfully removed {len(args.policy_ids)} policies from policy set!" + ) + + except Exception as e: + print(f"Error removing policies: {e}") + + # 11) Delete a policy set + if args.delete: + if not args.policy_set_id: + print("Error: --policy-set-id is required for delete operation") + return + + _print_header(f"Deleting policy set: {args.policy_set_id}") + + try: + confirmation = input( + f"Are you sure you want to delete policy set {args.policy_set_id}? (y/N): " + ) + if confirmation.lower() == "y": + client.policy_sets.delete(args.policy_set_id) + print(f"Successfully deleted policy set: {args.policy_set_id}") + else: + print("Delete operation cancelled.") + + except Exception as e: + print(f"Error deleting policy set: {e}") + + +if __name__ == "__main__": + main() diff --git a/src/pytfe/client.py b/src/pytfe/client.py index aca6c6d..8af2a38 100644 --- a/src/pytfe/client.py +++ b/src/pytfe/client.py @@ -12,6 +12,7 @@ from .resources.plan import Plans from .resources.policy import Policies from .resources.policy_check import PolicyChecks +from .resources.policy_set import PolicySets from .resources.projects import Projects from .resources.query_run import QueryRuns from .resources.registry_module import RegistryModules @@ -76,6 +77,7 @@ def __init__(self, config: TFEConfig | None = None): self.run_events = RunEvents(self._transport) self.policies = Policies(self._transport) self.policy_checks = PolicyChecks(self._transport) + self.policy_sets = PolicySets(self._transport) # SSH Keys self.ssh_keys = SSHKeys(self._transport) diff --git a/src/pytfe/errors.py b/src/pytfe/errors.py index fcab482..393e1d8 100644 --- a/src/pytfe/errors.py +++ b/src/pytfe/errors.py @@ -423,3 +423,25 @@ class InvalidPolicyCheckIDError(InvalidValues): def __init__(self, message: str = "invalid value for policy check ID"): super().__init__(message) + + +# Policy Set errors +class InvalidPolicySetIDError(InvalidValues): + """Raised when an invalid policy set ID is provided.""" + + def __init__(self, message: str = "invalid value for policy set ID"): + super().__init__(message) + + +class RequiredPoliciesError(RequiredFieldMissing): + """Raised when a required policies field is missing.""" + + def __init__(self, message: str = "policies are required"): + super().__init__(message) + + +class InvalidPoliciesError(InvalidValues): + """Raised when an invalid policies field is provided.""" + + def __init__(self, message: str = "must provide at least one policy"): + super().__init__(message) diff --git a/src/pytfe/models/__init__.py b/src/pytfe/models/__init__.py index cc79a7f..cad12c1 100644 --- a/src/pytfe/models/__init__.py +++ b/src/pytfe/models/__init__.py @@ -19,6 +19,11 @@ AgentTokenListOptions, ) +# ──Apply / Plans ────────────────────────────────────────────────────────────── +from .apply import ( + Apply, +) + # ── Core models split out of old types.py ───────────────────────────────────── # Adjust these imports to match where you placed them during the split. # Common / pagination / enums @@ -86,6 +91,51 @@ ReadRunQueueOptions, RunQueue, ) +from .plan import ( + Plan, +) +from .policy import ( + Policy, + PolicyCreateOptions, + PolicyList, + PolicyListOptions, + PolicyUpdateOptions, +) + +# ── Policy ───────────────────────────────────────────────────────────── +from .policy_check import ( + PolicyActions, + PolicyCheck, + PolicyCheckIncludeOpt, + PolicyCheckList, + PolicyCheckListOptions, + PolicyPermissions, + PolicyResult, + PolicyScope, + PolicyStatus, + PolicyStatusTimestamps, +) +from .policy_set import ( + PolicySet, + PolicySetAddPoliciesOptions, + PolicySetAddProjectsOptions, + PolicySetAddWorkspaceExclusionsOptions, + PolicySetAddWorkspacesOptions, + PolicySetCreateOptions, + PolicySetIncludeOpt, + PolicySetList, + PolicySetListOptions, + PolicySetReadOptions, + PolicySetRemovePoliciesOptions, + PolicySetRemoveProjectsOptions, + PolicySetRemoveWorkspaceExclusionsOptions, + PolicySetRemoveWorkspacesOptions, + PolicySetUpdateOptions, +) +from .policy_types import ( + EnforcementLevel, + PolicyKind, +) from .project import Project # ── Query Runs ──────────────────────────────────────────────────────────────── @@ -158,8 +208,55 @@ # Runs from .run import ( + OrganizationRunList, Run, + RunActions, + RunApplyOptions, + RunCancelOptions, + RunCreateOptions, + RunDiscardOptions, + RunForceCancelOptions, + RunIncludeOpt, + RunList, + RunListForOrganizationOptions, + RunListOptions, + RunOperation, + RunPermissions, + RunReadOptions, + RunSource, RunStatus, + RunStatusTimestamps, + RunVariable, + RunVariableAttr, +) +from .run_event import ( + RunEvent, + RunEventIncludeOpt, + RunEventList, + RunEventListOptions, + RunEventReadOptions, +) +from .run_task import ( + GlobalRunTask, + GlobalRunTaskOptions, + RunTask, + RunTaskCreateOptions, + RunTaskIncludeOptions, + RunTaskList, + RunTaskListOptions, + RunTaskReadOptions, + RunTaskUpdateOptions, + Stage, + TaskEnforcementLevel, +) +from .run_trigger import ( + RunTrigger, + RunTriggerCreateOptions, + RunTriggerFilterOp, + RunTriggerIncludeOp, + RunTriggerList, + RunTriggerListOptions, + SourceableChoice, ) # ── SSH Keys ────────────────────────────────────────────────────────────────── @@ -183,6 +280,7 @@ from .workspace import ( LockedByChoice, VCSRepo, + VCSRepoOptions, Workspace, WorkspaceActions, WorkspaceAddRemoteStateConsumersOptions, @@ -341,6 +439,7 @@ "VariableUpdateOptions", "LockedByChoice", "VCSRepo", + "VCSRepoOptions", "Workspace", "WorkspaceActions", "WorkspaceAddRemoteStateConsumersOptions", @@ -364,8 +463,91 @@ "WorkspaceTagListOptions", "WorkspaceUpdateOptions", "WorkspaceUpdateRemoteStateConsumersOptions", - "Run", "RunQueue", - "RunStatus", "ReadRunQueueOptions", + # Apply & Plans + "Apply", + "Plan", + # Runs + "Run", + "RunStatus", + "RunSource", + "RunIncludeOpt", + "RunOperation", + "RunActions", + "RunPermissions", + "RunStatusTimestamps", + "RunVariable", + "RunVariableAttr", + "RunList", + "RunListOptions", + "OrganizationRunList", + "RunListForOrganizationOptions", + "RunCreateOptions", + "RunReadOptions", + "RunApplyOptions", + "RunCancelOptions", + "RunForceCancelOptions", + "RunDiscardOptions", + # Run events + "RunEvent", + "RunEventIncludeOpt", + "RunEventList", + "RunEventListOptions", + "RunEventReadOptions", + # Run tasks + "RunTask", + "RunTaskIncludeOptions", + "GlobalRunTask", + "GlobalRunTaskOptions", + "Stage", + "TaskEnforcementLevel", + "RunTaskList", + "RunTaskListOptions", + "RunTaskCreateOptions", + "RunTaskUpdateOptions", + "RunTaskReadOptions", + # Run triggers + "RunTrigger", + "RunTriggerCreateOptions", + "RunTriggerList", + "RunTriggerListOptions", + "SourceableChoice", + "RunTriggerFilterOp", + "RunTriggerIncludeOp", + # Policy Checks + "PolicyCheck", + "PolicyCheckIncludeOpt", + "PolicyScope", + "PolicyStatus", + "PolicyActions", + "PolicyPermissions", + "PolicyResult", + "PolicyStatusTimestamps", + "PolicyCheckListOptions", + "PolicyCheckList", + # Policy + "Policy", + "PolicyCreateOptions", + "PolicyList", + "PolicyListOptions", + "PolicyUpdateOptions", + # Policy Sets + "PolicySet", + "PolicySetIncludeOpt", + "PolicySetList", + "PolicySetAddPoliciesOptions", + "PolicySetAddProjectsOptions", + "PolicySetAddWorkspacesOptions", + "PolicySetAddWorkspaceExclusionsOptions", + "PolicySetCreateOptions", + "PolicySetListOptions", + "PolicySetReadOptions", + "PolicySetRemovePoliciesOptions", + "PolicySetRemoveWorkspacesOptions", + "PolicySetRemoveWorkspaceExclusionsOptions", + "PolicySetRemoveProjectsOptions", + "PolicySetUpdateOptions", + "PolicyKind", + "EnforcementLevel", ] diff --git a/src/pytfe/models/policy.py b/src/pytfe/models/policy.py index fc40834..fb182d9 100644 --- a/src/pytfe/models/policy.py +++ b/src/pytfe/models/policy.py @@ -1,19 +1,11 @@ from __future__ import annotations from datetime import datetime -from enum import Enum from pydantic import BaseModel, ConfigDict, Field from .organization import Organization -from .policy_set import PolicyKind - - -class EnforcementLevel(str, Enum): - ENFORCEMENT_ADVISORY = "advisory" - ENFORCEMENT_MANDATORY = "mandatory" - ENFORCEMENT_HARD = "hard-mandatory" - ENFORCEMENT_SOFT = "soft-mandatory" +from .policy_types import EnforcementLevel, PolicyKind class Policy(BaseModel): diff --git a/src/pytfe/models/policy_set.py b/src/pytfe/models/policy_set.py index e9ee8e6..60fccb3 100644 --- a/src/pytfe/models/policy_set.py +++ b/src/pytfe/models/policy_set.py @@ -1,8 +1,164 @@ from __future__ import annotations +from datetime import datetime from enum import Enum +from pydantic import BaseModel, ConfigDict, Field -class PolicyKind(str, Enum): - OPA = "opa" - SENTINEL = "sentinel" +from .organization import Organization +from .policy import Policy +from .policy_set_version import PolicySetVersion +from .policy_types import PolicyKind +from .project import Project +from .workspace import VCSRepo, VCSRepoOptions, Workspace + + +class PolicySetIncludeOpt(str, Enum): + POLICY_SET_POLICIES = "policies" + POLICY_SET_WORKSPACES = "workspaces" + POLICY_SET_PROJECTS = "projects" + POLICY_SET_NEWEST_VERSION = "newest_version" + POLICY_SET_CURRENT_VERSION = "current_version" + POLICY_SET_WORKSPACE_EXCLUSIONS = "workspace_exclusions" + + +class PolicySet(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + id: str + name: str | None = Field(None, alias="name") + description: str | None = Field(None, alias="description") + kind: PolicyKind | None = Field(None, alias="kind") + overridable: bool | None = Field(None, alias="overridable") + Global: bool | None = Field(None, alias="global") + policies_path: str | None = Field(None, alias="policies-path") + + # **Note: This field is still in BETA and subject to change.** + policy_count: int | None = Field(None, alias="policy-count") + vcs_repo: VCSRepo | None = Field(None, alias="vcs-repo") + workspace_count: int | None = Field(None, alias="workspace-count") + project_count: int | None = Field(None, alias="project-count") + created_at: datetime | None = Field(None, alias="created-at") + updated_at: datetime | None = Field(None, alias="updated-at") + agent_enabled: bool | None = Field(None, alias="agent-enabled") + policy_tool_version: str | None = Field(None, alias="policy-tool-version") + + # Relations + organization: Organization | None = Field(None, alias="organization") + workspaces: list[Workspace] = Field(default_factory=list, alias="workspaces") + projects: list[Project] = Field(default_factory=list, alias="projects") + policies: list[Policy] = Field(default_factory=list, alias="policies") + newest_version: PolicySetVersion | None = Field(None, alias="newest-version") + current_version: PolicySetVersion | None = Field(None, alias="current-version") + workspace_exclusions: list[Workspace] = Field( + default_factory=list, alias="workspace-exclusions" + ) + + +class PolicySetList(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + items: list[PolicySet] = Field(default_factory=list) + current_page: int | None = None + total_pages: int | None = None + prev_page: int | None = None + next_page: int | None = None + total_count: int | None = None + + +class PolicySetListOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + search: str | None = Field(None, alias="search[name]") + kind: PolicyKind | None = Field(None, alias="filter[kind]") + include: list[PolicySetIncludeOpt] | None = Field(None, alias="include") + page_number: int | None = Field(None, alias="page[number]") + page_size: int | None = Field(None, alias="page[size]") + + +class PolicySetReadOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + include: list[PolicySetIncludeOpt] | None = Field(None, alias="include") + + +class PolicySetCreateOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + name: str = Field(..., alias="name") + description: str | None = Field(None, alias="description") + Global: bool | None = Field(None, alias="global") + kind: PolicyKind | None = Field(None, alias="kind") + overridable: bool | None = Field(None, alias="overridable") + agent_enabled: bool | None = Field(None, alias="agent-enabled") + policy_tool_version: str | None = Field(None, alias="policy-tool-version") + policies_path: str | None = Field(None, alias="policies-path") + policies: list[Policy] | None = Field(None, alias="policies") + + vcs_repo: VCSRepoOptions | None = Field(None, alias="vcs-repo") + workspaces: list[Workspace] | None = Field(None, alias="workspaces") + projects: list[Project] | None = Field(None, alias="projects") + workspace_exclusions: list[Workspace] | None = Field( + None, alias="workspace-exclusions" + ) + + +class PolicySetUpdateOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + name: str | None = Field(None, alias="name") + description: str | None = Field(None, alias="description") + Global: bool | None = Field(None, alias="global") + overridable: bool | None = Field(None, alias="overridable") + agent_enabled: bool | None = Field(None, alias="agent-enabled") + policy_tool_version: str | None = Field(None, alias="policy-tool-version") + policies_path: str | None = Field(None, alias="policies-path") + vcs_repo: VCSRepoOptions | None = Field(None, alias="vcs-repo") + + +class PolicySetAddPoliciesOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + policies: list[Policy] = Field(default_factory=list) + + +class PolicySetRemovePoliciesOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + policies: list[Policy] = Field(default_factory=list) + + +class PolicySetAddWorkspacesOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + workspaces: list[Workspace] = Field(default_factory=list) + + +class PolicySetRemoveWorkspacesOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + workspaces: list[Workspace] = Field(default_factory=list) + + +class PolicySetAddWorkspaceExclusionsOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + workspace_exclusions: list[Workspace] = Field(default_factory=list) + + +class PolicySetRemoveWorkspaceExclusionsOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + workspace_exclusions: list[Workspace] = Field(default_factory=list) + + +class PolicySetAddProjectsOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + projects: list[Project] = Field(default_factory=list) + + +class PolicySetRemoveProjectsOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + projects: list[Project] = Field(default_factory=list) diff --git a/src/pytfe/models/policy_set_version.py b/src/pytfe/models/policy_set_version.py new file mode 100644 index 0000000..8b8b0d7 --- /dev/null +++ b/src/pytfe/models/policy_set_version.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from pydantic import BaseModel, ConfigDict, Field + + +class PolicySetVersion(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + id: str + error: str | None = Field(None, alias="error") diff --git a/src/pytfe/models/policy_types.py b/src/pytfe/models/policy_types.py new file mode 100644 index 0000000..5a5c6b5 --- /dev/null +++ b/src/pytfe/models/policy_types.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from enum import Enum + + +class PolicyKind(str, Enum): + """The kind of policy - shared between Policy and PolicySet models.""" + + OPA = "opa" + SENTINEL = "sentinel" + + +class EnforcementLevel(str, Enum): + """Policy enforcement levels.""" + + ENFORCEMENT_ADVISORY = "advisory" + ENFORCEMENT_MANDATORY = "mandatory" + ENFORCEMENT_HARD = "hard-mandatory" + ENFORCEMENT_SOFT = "soft-mandatory" diff --git a/src/pytfe/models/workspace.py b/src/pytfe/models/workspace.py index 2205621..3857241 100644 --- a/src/pytfe/models/workspace.py +++ b/src/pytfe/models/workspace.py @@ -338,3 +338,12 @@ class VCSRepo(BaseModel): oauth_token_id: str | None = None tags_regex: str | None = None gha_installation_id: str | None = None + + +class VCSRepoOptions(BaseModel): + branch: str | None = None + identifier: str | None = None + ingress_submodules: bool | None = None + oauth_token_id: str | None = None + tags_regex: str | None = None + gha_installation_id: str | None = None diff --git a/src/pytfe/resources/policy_set.py b/src/pytfe/resources/policy_set.py new file mode 100644 index 0000000..28bf87d --- /dev/null +++ b/src/pytfe/resources/policy_set.py @@ -0,0 +1,461 @@ +from __future__ import annotations + +from ..errors import ( + InvalidNameError, + InvalidOrgError, + InvalidPoliciesError, + InvalidPolicySetIDError, + RequiredNameError, + RequiredPoliciesError, + WorkspaceMinimumLimitError, + WorkspaceRequiredError, +) +from ..models.policy_set import ( + PolicySet, + PolicySetAddPoliciesOptions, + PolicySetAddProjectsOptions, + PolicySetAddWorkspaceExclusionsOptions, + PolicySetAddWorkspacesOptions, + PolicySetCreateOptions, + PolicySetList, + PolicySetListOptions, + PolicySetReadOptions, + PolicySetRemovePoliciesOptions, + PolicySetRemoveProjectsOptions, + PolicySetRemoveWorkspaceExclusionsOptions, + PolicySetRemoveWorkspacesOptions, + PolicySetUpdateOptions, +) +from ..utils import valid_string, valid_string_id +from ._base import _Service + + +class PolicySets(_Service): + """ + PolicySets describes all the policy set related methods that the Terraform Enterprise API supports. + TFE API docs: https://developer.hashicorp.com/terraform/cloud-docs/api-docs/policy-sets + """ + + def list( + self, organization: str, options: PolicySetListOptions | None = None + ) -> PolicySetList: + """List all the policy sets of the given organization.""" + if not valid_string_id(organization): + raise InvalidOrgError() + params = options.model_dump(by_alias=True, exclude_none=True) if options else {} + r = self.t.request( + "GET", + f"/api/v2/organizations/{organization}/policy-sets", + params=params, + ) + jd = r.json() + items = [] + meta = jd.get("meta", {}) + pagination = meta.get("pagination", {}) + for d in jd.get("data", []): + attrs = d.get("attributes", {}) + attrs["id"] = d.get("id") + attrs["organization"] = d.get("relationships", {}).get("organization", {}) + attrs["workspace_exclusions"] = ( + d.get("relationships", {}) + .get("workspace-exclusions", {}) + .get("data", []) + ) + attrs["workspaces"] = ( + d.get("relationships", {}).get("workspaces", {}).get("data", []) + ) + attrs["projects"] = ( + d.get("relationships", {}).get("projects", {}).get("data", []) + ) + attrs["policies"] = ( + d.get("relationships", {}).get("policies", {}).get("data", []) + ) + items.append(PolicySet.model_validate(attrs)) + return PolicySetList( + items=items, + current_page=pagination.get("current-page"), + total_pages=pagination.get("total-pages"), + prev_page=pagination.get("prev-page"), + next_page=pagination.get("next-page"), + total_count=pagination.get("total-count"), + ) + + def create(self, organization: str, options: PolicySetCreateOptions) -> PolicySet: + """Create a new policy set in the given organization.""" + if not valid_string_id(organization): + raise InvalidOrgError() + if not valid_string(options.name): + raise RequiredNameError() + if not valid_string_id(options.name): + raise InvalidNameError() + + # Separate attributes from relationships + options_dict = options.model_dump(by_alias=True, exclude_none=True) + + # Relationship fields that go under relationships + relationship_fields = [ + "workspaces", + "projects", + "workspace-exclusions", + "policies", + ] + relationships = {} + attributes = {} + + for key, value in options_dict.items(): + if key in relationship_fields: + # Convert the relationship data to the proper JSON:API format + if value: # Only add if not None/empty + relationships[key] = { + "data": [ + {"id": item.id, "type": self._get_relationship_type(key)} + for item in value + ] + } + else: + attributes[key] = value + + if not attributes: + raise ValueError("No attributes provided to create a policy set") + + payload = { + "data": { + "attributes": attributes, + "type": "policy-sets", + } + } + + # Only add relationships if they exist + if relationships: + payload["data"]["relationships"] = relationships + + r = self.t.request( + "POST", + f"/api/v2/organizations/{organization}/policy-sets", + json_body=payload, + ) + jd = r.json() + data = jd.get("data", {}) + attrs = data.get("attributes", {}) + attrs["id"] = data.get("id") + + # Handle relationships in response + relationships_data = data.get("relationships", {}) + attrs["organization"] = relationships_data.get("organization", {}) + attrs["workspace_exclusions"] = relationships_data.get( + "workspace-exclusions", {} + ).get("data", []) + attrs["workspaces"] = relationships_data.get("workspaces", {}).get("data", []) + attrs["projects"] = relationships_data.get("projects", {}).get("data", []) + attrs["policies"] = relationships_data.get("policies", {}).get("data", []) + + return PolicySet.model_validate(attrs) + + def read(self, policy_set_id: str) -> PolicySet: + """Read a policy set by its ID.""" + return self.read_with_options(policy_set_id, None) + + def read_with_options( + self, policy_set_id: str, options: PolicySetReadOptions | None = None + ) -> PolicySet: + """Read a policy set by its ID with additional options.""" + if not valid_string_id(policy_set_id): + raise InvalidPolicySetIDError() + + params = ( + options.model_dump(by_alias=True, exclude_none=True) if options else None + ) + + r = self.t.request( + "GET", + f"/api/v2/policy-sets/{policy_set_id}", + params=params, + ) + jd = r.json() + data = jd.get("data", {}) + attrs = data.get("attributes", {}) + attrs["id"] = data.get("id") + + # Handle relationships in response + relationships_data = data.get("relationships", {}) + attrs["organization"] = relationships_data.get("organization", {}) + attrs["workspace_exclusions"] = relationships_data.get( + "workspace-exclusions", {} + ).get("data", []) + attrs["workspaces"] = relationships_data.get("workspaces", {}).get("data", []) + attrs["projects"] = relationships_data.get("projects", {}).get("data", []) + attrs["policies"] = relationships_data.get("policies", {}).get("data", []) + + return PolicySet.model_validate(attrs) + + def update(self, policy_set_id: str, options: PolicySetUpdateOptions) -> PolicySet: + """Update an existing policy set.""" + if not valid_string_id(policy_set_id): + raise InvalidPolicySetIDError() + + attrs = options.model_dump(by_alias=True, exclude_none=True) + if not attrs: + raise ValueError("No attributes provided to update the policy set") + + payload = { + "data": { + "attributes": attrs, + "type": "policy-sets", + "id": policy_set_id, + } + } + + r = self.t.request( + "PATCH", + f"/api/v2/policy-sets/{policy_set_id}", + json_body=payload, + ) + jd = r.json() + data = jd.get("data", {}) + attrs = data.get("attributes", {}) + attrs["id"] = data.get("id") + + # Handle relationships in response + relationships_data = data.get("relationships", {}) + attrs["organization"] = relationships_data.get("organization", {}) + attrs["workspace_exclusions"] = relationships_data.get( + "workspace-exclusions", {} + ).get("data", []) + attrs["workspaces"] = relationships_data.get("workspaces", {}).get("data", []) + attrs["projects"] = relationships_data.get("projects", {}).get("data", []) + attrs["policies"] = relationships_data.get("policies", {}).get("data", []) + + return PolicySet.model_validate(attrs) + + def add_policies( + self, policy_set_id: str, options: PolicySetAddPoliciesOptions + ) -> None: + """Add policies to a policy set.""" + if not valid_string_id(policy_set_id): + raise InvalidPolicySetIDError() + + if not options.policies: + raise RequiredPoliciesError() + + if len(options.policies) == 0: + raise InvalidPoliciesError() + + payload = { + "data": [ + {"id": policy.id, "type": "policies"} for policy in options.policies + ] + } + + self.t.request( + "POST", + f"/api/v2/policy-sets/{policy_set_id}/relationships/policies", + json_body=payload, + ) + return None + + def remove_policies( + self, policy_set_id: str, options: PolicySetRemovePoliciesOptions + ) -> None: + """Remove policies from a policy set.""" + if not valid_string_id(policy_set_id): + raise InvalidPolicySetIDError() + + if not options.policies: + raise RequiredPoliciesError() + + if len(options.policies) == 0: + raise InvalidPoliciesError() + + payload = { + "data": [ + {"id": policy.id, "type": "policies"} for policy in options.policies + ] + } + + self.t.request( + "DELETE", + f"/api/v2/policy-sets/{policy_set_id}/relationships/policies", + json_body=payload, + ) + return None + + def add_workspaces( + self, policy_set_id: str, options: PolicySetAddWorkspacesOptions + ) -> None: + """Add workspaces to a policy set.""" + if not valid_string_id(policy_set_id): + raise InvalidPolicySetIDError() + + if not options.workspaces: + raise WorkspaceRequiredError() + + if len(options.workspaces) == 0: + raise WorkspaceMinimumLimitError() + + payload = { + "data": [ + {"id": workspace.id, "type": "workspaces"} + for workspace in options.workspaces + ] + } + + self.t.request( + "POST", + f"/api/v2/policy-sets/{policy_set_id}/relationships/workspaces", + json_body=payload, + ) + return None + + def remove_workspaces( + self, policy_set_id: str, options: PolicySetRemoveWorkspacesOptions + ) -> None: + """Remove workspaces from a policy set.""" + if not valid_string_id(policy_set_id): + raise InvalidPolicySetIDError() + + if not options.workspaces: + raise WorkspaceRequiredError() + + if len(options.workspaces) == 0: + raise WorkspaceMinimumLimitError() + + payload = { + "data": [ + {"id": workspace.id, "type": "workspaces"} + for workspace in options.workspaces + ] + } + + self.t.request( + "DELETE", + f"/api/v2/policy-sets/{policy_set_id}/relationships/workspaces", + json_body=payload, + ) + return None + + def add_workspace_exclusions( + self, policy_set_id: str, options: PolicySetAddWorkspaceExclusionsOptions + ) -> None: + """Add workspace exclusions to a policy set.""" + if not valid_string_id(policy_set_id): + raise InvalidPolicySetIDError() + + if not options.workspace_exclusions: + raise WorkspaceRequiredError() + + if len(options.workspace_exclusions) == 0: + raise WorkspaceMinimumLimitError() + + payload = { + "data": [ + {"id": workspace.id, "type": "workspaces"} + for workspace in options.workspace_exclusions + ] + } + + self.t.request( + "POST", + f"/api/v2/policy-sets/{policy_set_id}/relationships/workspace-exclusions", + json_body=payload, + ) + return None + + def remove_workspace_exclusions( + self, policy_set_id: str, options: PolicySetRemoveWorkspaceExclusionsOptions + ) -> None: + """Remove workspace exclusions from a policy set.""" + if not valid_string_id(policy_set_id): + raise InvalidPolicySetIDError() + + if not options.workspace_exclusions: + raise WorkspaceRequiredError() + + if len(options.workspace_exclusions) == 0: + raise WorkspaceMinimumLimitError() + + payload = { + "data": [ + {"id": workspace.id, "type": "workspaces"} + for workspace in options.workspace_exclusions + ] + } + + self.t.request( + "DELETE", + f"/api/v2/policy-sets/{policy_set_id}/relationships/workspace-exclusions", + json_body=payload, + ) + return None + + def add_projects( + self, policy_set_id: str, options: PolicySetAddProjectsOptions + ) -> None: + """Add projects to a policy set.""" + if not valid_string_id(policy_set_id): + raise InvalidPolicySetIDError() + + if not options.projects: + raise ValueError("project is required") + + if len(options.projects) == 0: + raise ValueError("must provide at least one project") + + payload = { + "data": [ + {"id": project.id, "type": "projects"} for project in options.projects + ] + } + + self.t.request( + "POST", + f"/api/v2/policy-sets/{policy_set_id}/relationships/projects", + json_body=payload, + ) + return None + + def remove_projects( + self, policy_set_id: str, options: PolicySetRemoveProjectsOptions + ) -> None: + """Remove projects from a policy set.""" + if not valid_string_id(policy_set_id): + raise InvalidPolicySetIDError() + + if not options.projects: + raise ValueError("project is required") + + if len(options.projects) == 0: + raise ValueError("must provide at least one project") + + payload = { + "data": [ + {"id": project.id, "type": "projects"} for project in options.projects + ] + } + + self.t.request( + "DELETE", + f"/api/v2/policy-sets/{policy_set_id}/relationships/projects", + json_body=payload, + ) + return None + + def delete(self, policy_set_id: str) -> None: + """Delete a policy set by its ID.""" + if not valid_string_id(policy_set_id): + raise InvalidPolicySetIDError() + + self.t.request( + "DELETE", + f"/api/v2/policy-sets/{policy_set_id}", + ) + return None + + def _get_relationship_type(self, field_name: str) -> str: + """Get the JSON:API type for relationship fields.""" + type_mapping = { + "workspaces": "workspaces", + "projects": "projects", + "workspace-exclusions": "workspaces", + "policies": "policies", + } + return type_mapping.get(field_name, field_name)