Skip to content

Commit 862a0ce

Browse files
isivaselvanKshitijaChoudhari
authored andcommitted
Features providing Policy Check API Specs (#42)
1 parent db48cd9 commit 862a0ce

5 files changed

Lines changed: 390 additions & 9 deletions

File tree

examples/policy_check.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
from __future__ import annotations
2+
3+
import argparse
4+
import os
5+
6+
from tfe import TFEClient, TFEConfig
7+
from tfe.models.policy_check import PolicyCheckListOptions
8+
9+
10+
def _print_header(title: str):
11+
print("\n" + "=" * 80)
12+
print(title)
13+
print("=" * 80)
14+
15+
16+
def main():
17+
parser = argparse.ArgumentParser(
18+
description="Policy Checks demo for python-tfe SDK"
19+
)
20+
parser.add_argument(
21+
"--address", default=os.getenv("TFE_ADDRESS", "https://app.terraform.io")
22+
)
23+
parser.add_argument("--token", default=os.getenv("TFE_TOKEN", ""))
24+
parser.add_argument(
25+
"--run-id", required=True, help="Run ID to list policy checks for"
26+
)
27+
parser.add_argument(
28+
"--policy-check-id", help="Specific policy check ID to read/override"
29+
)
30+
parser.add_argument(
31+
"--override", action="store_true", help="Override the specified policy check"
32+
)
33+
parser.add_argument(
34+
"--get-logs",
35+
action="store_true",
36+
help="Get logs for the specified policy check",
37+
)
38+
parser.add_argument("--page", type=int, default=1)
39+
parser.add_argument("--page-size", type=int, default=20)
40+
args = parser.parse_args()
41+
42+
if not args.token:
43+
print("Error: TFE_TOKEN environment variable or --token argument is required")
44+
return
45+
46+
cfg = TFEConfig(address=args.address, token=args.token)
47+
client = TFEClient(cfg)
48+
49+
# 1) List all policy checks for the given run
50+
_print_header(f"Listing policy checks for run: {args.run_id}")
51+
52+
options = PolicyCheckListOptions(
53+
page_number=args.page,
54+
page_size=args.page_size,
55+
)
56+
57+
try:
58+
pc_list = client.policy_checks.list(args.run_id, options)
59+
60+
print(f"Total policy checks: {pc_list.total_count}")
61+
print(f"Page {pc_list.current_page} of {pc_list.total_pages}")
62+
print()
63+
64+
if not pc_list.items:
65+
print("No policy checks found for this run.")
66+
else:
67+
for pc in pc_list.items:
68+
print(f"- ID: {pc.id}")
69+
print(f" Status: {pc.status}")
70+
print(f" Scope: {pc.scope}")
71+
if pc.result:
72+
print(
73+
f" Result: passed={pc.result.passed}, failed={pc.result.total_failed}"
74+
)
75+
print(f" Duration: {pc.result.duration}ms")
76+
if pc.actions:
77+
print(f" Can Override: {pc.actions.is_overridable}")
78+
if pc.permissions:
79+
print(f" Has Override Permission: {pc.permissions.can_override}")
80+
print()
81+
82+
except Exception as e:
83+
print(f"Error listing policy checks: {e}")
84+
return
85+
86+
# 2) Read a specific policy check (if policy-check-id is provided)
87+
if args.policy_check_id:
88+
_print_header(f"Reading policy check: {args.policy_check_id}")
89+
90+
try:
91+
pc = client.policy_checks.read(args.policy_check_id)
92+
93+
print(f"ID: {pc.id}")
94+
print(f"Status: {pc.status}")
95+
print(f"Scope: {pc.scope}")
96+
97+
if pc.result:
98+
print("Result Summary:")
99+
print(f" - Passed: {pc.result.passed}")
100+
print(f" - Hard Failed: {pc.result.hard_failed}")
101+
print(f" - Soft Failed: {pc.result.soft_failed}")
102+
print(f" - Advisory Failed: {pc.result.advisory_failed}")
103+
print(f" - Total Failed: {pc.result.total_failed}")
104+
print(f" - Duration: {pc.result.duration}ms")
105+
print(f" - Overall Result: {pc.result.result}")
106+
107+
if pc.actions:
108+
print("Actions:")
109+
print(f" - Is Overridable: {pc.actions.is_overridable}")
110+
111+
if pc.permissions:
112+
print("Permissions:")
113+
print(f" - Can Override: {pc.permissions.can_override}")
114+
115+
if pc.status_timestamps:
116+
print("Status Timestamps:")
117+
if pc.status_timestamps.queued_at:
118+
print(f" - Queued At: {pc.status_timestamps.queued_at}")
119+
if pc.status_timestamps.passed_at:
120+
print(f" - Passed At: {pc.status_timestamps.passed_at}")
121+
if pc.status_timestamps.soft_failed_at:
122+
print(f" - Soft Failed At: {pc.status_timestamps.soft_failed_at}")
123+
if pc.status_timestamps.hard_failed_at:
124+
print(f" - Hard Failed At: {pc.status_timestamps.hard_failed_at}")
125+
if pc.status_timestamps.errored_at:
126+
print(f" - Errored At: {pc.status_timestamps.errored_at}")
127+
128+
except Exception as e:
129+
print(f"Error reading policy check: {e}")
130+
return
131+
132+
# 3) Override the policy check (if requested and possible)
133+
if args.override:
134+
_print_header(f"Overriding policy check: {args.policy_check_id}")
135+
136+
try:
137+
overridden_pc = client.policy_checks.override(args.policy_check_id)
138+
print(f"Policy check {overridden_pc.id} successfully overridden!")
139+
print(f"New status: {overridden_pc.status}")
140+
141+
except Exception as e:
142+
print(f"Error overriding policy check: {e}")
143+
144+
# 4) Get logs for the policy check (if requested)
145+
if args.get_logs:
146+
_print_header(f"Getting logs for policy check: {args.policy_check_id}")
147+
148+
try:
149+
print(
150+
"Fetching logs (this may take a moment if the policy check is still running)..."
151+
)
152+
logs = client.policy_checks.logs(args.policy_check_id)
153+
154+
print("Policy Check Logs:")
155+
print("-" * 60)
156+
print(logs)
157+
print("-" * 60)
158+
159+
except Exception as e:
160+
print(f"Error getting policy check logs: {e}")
161+
162+
163+
if __name__ == "__main__":
164+
main()

src/tfe/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from .resources.organizations import Organizations
1313
from .resources.plan import Plans
1414
from .resources.policy import Policies
15+
from .resources.policy_check import PolicyChecks
1516
from .resources.projects import Projects
1617
from .resources.query_run import QueryRuns
1718
from .resources.registry_module import RegistryModules
@@ -76,6 +77,7 @@ def __init__(self, config: TFEConfig | None = None):
7677
self.query_runs = QueryRuns(self._transport)
7778
self.run_events = RunEvents(self._transport)
7879
self.policies = Policies(self._transport)
80+
self.policy_checks = PolicyChecks(self._transport)
7981

8082
# SSH Keys
8183
self.ssh_keys = SSHKeys(self._transport)

src/tfe/errors.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,3 +415,11 @@ class RequiredEnforceError(RequiredFieldMissing):
415415

416416
def __init__(self, message: str = "enforce or enforcement-level is required"):
417417
super().__init__(message)
418+
419+
420+
# Policy Check errors
421+
class InvalidPolicyCheckIDError(InvalidValues):
422+
"""Raised when an invalid policy check ID is provided."""
423+
424+
def __init__(self, message: str = "invalid value for policy check ID"):
425+
super().__init__(message)

src/tfe/models/policy_check.py

Lines changed: 108 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,123 @@
11
from __future__ import annotations
22

3-
from typing import TYPE_CHECKING
3+
from datetime import datetime
4+
from enum import Enum
5+
from typing import TYPE_CHECKING, Any
46

57
from pydantic import BaseModel, ConfigDict, Field
68

79
if TYPE_CHECKING:
810
from .run import Run
911

1012

11-
# PolicyCheck represents a Terraform Enterprise policy check..
13+
class PolicyScope(str, Enum):
14+
"""The scope of the policy check."""
15+
16+
POLICY_SCOPE_ORGANIZATION = "organization"
17+
POLICY_SCOPE_WORKSPACE = "workspace"
18+
19+
20+
class PolicyStatus(str, Enum):
21+
"""The status of the policy check."""
22+
23+
POLICY_CANCELED = "canceled"
24+
POLICY_ERRORED = "errored"
25+
POLICY_HARD_FAILED = "hard_failed"
26+
POLICY_OVERRIDDEN = "overridden"
27+
POLICY_PASSES = "passed"
28+
POLICY_PENDING = "pending"
29+
POLICY_QUEUED = "queued"
30+
POLICY_SOFT_FAILED = "soft_failed"
31+
POLICY_UNREACHABLE = "unreachable"
32+
33+
34+
class PolicyCheckIncludeOpt(str, Enum):
35+
"""A list of relations to include"""
36+
37+
POLICY_CHECK_RUN_WORKSPACE = "run.workspace"
38+
POLICY_CHECK_RUN = "run"
39+
40+
1241
class PolicyCheck(BaseModel):
42+
"""PolicyCheck represents a Terraform Enterprise policy check."""
43+
1344
model_config = ConfigDict(populate_by_name=True, validate_by_name=True)
1445

1546
id: str
16-
# actions: PolicyActions = Field(..., alias="actions")
17-
# permissions: PolicyPermissions = Field(..., alias="permissions")
18-
# result: PolicyResult = Field(..., alias="result")
19-
# scope: PolicyScope = Field(..., alias="scope")
20-
# status: PolicyStatus = Field(..., alias="status")
21-
# status_timestamps: PolicyStatusTimestamps = Field(..., alias="status-timestamps")
47+
actions: PolicyActions | None = Field(None, alias="actions")
48+
permissions: PolicyPermissions | None = Field(None, alias="permissions")
49+
result: PolicyResult | None = Field(None, alias="result")
50+
scope: PolicyScope | None = Field(None, alias="scope")
51+
status: PolicyStatus | None = Field(None, alias="status")
52+
status_timestamps: PolicyStatusTimestamps | None = Field(
53+
None, alias="status-timestamps"
54+
)
2255

2356
# Relations
24-
run: Run = Field(..., alias="run")
57+
run: Run | None = Field(None, alias="run")
58+
59+
60+
class PolicyActions(BaseModel):
61+
"""PolicyActions represents the policy check actions."""
62+
63+
model_config = ConfigDict(populate_by_name=True, validate_by_name=True)
64+
65+
is_overridable: bool | None = Field(None, alias="is-overridable")
66+
67+
68+
class PolicyPermissions(BaseModel):
69+
"""PolicyPermissions represents the policy check permissions."""
70+
71+
model_config = ConfigDict(populate_by_name=True, validate_by_name=True)
72+
73+
can_override: bool | None = Field(None, alias="can-override")
74+
75+
76+
class PolicyResult(BaseModel):
77+
"""PolicyResult represents the complete policy check result"""
78+
79+
model_config = ConfigDict(populate_by_name=True, validate_by_name=True)
80+
81+
advisory_failed: int | None = Field(None, alias="advisory-failed")
82+
duration: int | None = Field(None, alias="duration")
83+
hard_failed: int | None = Field(None, alias="hard-failed")
84+
soft_failed: int | None = Field(None, alias="soft-failed")
85+
total_failed: int | None = Field(None, alias="total-failed")
86+
passed: int | None = Field(None, alias="passed")
87+
result: bool | None = Field(None, alias="result")
88+
sentinel: Any | None = Field(None, alias="sentinel")
89+
90+
91+
class PolicyStatusTimestamps(BaseModel):
92+
"""PolicyStatusTimestamps holds the timestamps for individual policy check statuses."""
93+
94+
model_config = ConfigDict(populate_by_name=True, validate_by_name=True)
95+
96+
errored_at: datetime | None = Field(None, alias="errored-at")
97+
hard_failed_at: datetime | None = Field(None, alias="hard-failed-at")
98+
passed_at: datetime | None = Field(None, alias="passed-at")
99+
queued_at: datetime | None = Field(None, alias="queued-at")
100+
soft_failed_at: datetime | None = Field(None, alias="soft-failed-at")
101+
102+
103+
class PolicyCheckListOptions(BaseModel):
104+
"""PolicyCheckListOptions represents the options for listing policy checks."""
105+
106+
model_config = ConfigDict(populate_by_name=True, validate_by_name=True)
107+
108+
include: list[PolicyCheckIncludeOpt] | None = Field(None, alias="include")
109+
page_number: int | None = Field(None, alias="page[number]")
110+
page_size: int | None = Field(None, alias="page[size]")
111+
112+
113+
class PolicyCheckList(BaseModel):
114+
"""PolicyCheckList represents a list of policy checks."""
115+
116+
model_config = ConfigDict(populate_by_name=True, validate_by_name=True)
117+
118+
items: list[PolicyCheck] = Field(default_factory=list, alias="items")
119+
current_page: int | None = Field(None, alias="current_page")
120+
total_pages: int | None = Field(None, alias="total_pages")
121+
prev_page: int | None = Field(None, alias="prev_page")
122+
next_page: int | None = Field(None, alias="next_page")
123+
total_count: int | None = Field(None, alias="total_count")

0 commit comments

Comments
 (0)