diff --git a/scripts/check_globus_endpoint_sync.py b/scripts/check_globus_endpoint_sync.py new file mode 100644 index 00000000..49a09966 --- /dev/null +++ b/scripts/check_globus_endpoint_sync.py @@ -0,0 +1,381 @@ +#!/usr/bin/env python +""" +Compare files between two Globus endpoints and identify missing files. + +This script lists files on a source and destination endpoint, then reports +which files exist on source but not on destination. + +Usage: + # Using endpoint names from config.yml + python -m scripts.sync_check_globus \ + --source spot832 \ + --source-path /raw/2024/01 \ + --dest data832 \ + --dest-path /raw/2024/01 + + # Using raw UUIDs (for endpoints not in config.yml) + python -m scripts.sync_check_globus \ + --source-uuid abc123-def456 \ + --source-path /raw/2024/01 \ + --dest-uuid xyz789-... \ + --dest-path /raw/2024/01 + + # Save missing files to a file + python -m scripts.sync_check_globus \ + --source spot832 --source-path /raw/2024/01 \ + --dest data832 --dest-path /raw/2024/01 \ + --output missing_files.txt + + # List available endpoints + python -m scripts.sync_check_globus --list-endpoints + + Example: + python -m scripts.check_globus_endpoint_sync \ + --source spot832 --source-path raw/_bls-00739_parkinson \ + --dest data832_raw --dest-path data/raw/_bls-00739_parkinson + +============================================================ +Source: spot832 (/raw/_bls-00739_parkinson) +Destination: data832_raw (/data/raw/_bls-00739_parkinson) +============================================================ +Files on source: 21 +Files on destination: 21 +Missing from dest: 0 +============================================================ + +✓ All files are synced! No missing files found. +""" +from dotenv import load_dotenv +import json +import logging +import os +from pathlib import Path +from typing import Set, List, Optional +import uuid + +import globus_sdk +import typer + +from orchestration.config import get_config +from orchestration.globus.transfer import build_endpoints + +load_dotenv() + +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + +TOKEN_FILE = Path.home() / ".globus_sync_check_tokens.json" + + +def get_transfer_client() -> globus_sdk.TransferClient: + """ + Get a Globus TransferClient. + + Uses confidential client if GLOBUS_CLIENT_ID and GLOBUS_CLIENT_SECRET are set, + otherwise uses cached tokens or prompts for browser-based login. + + :returns: Authenticated TransferClient + """ + client_id = os.getenv("GLOBUS_CLIENT_ID") + client_secret = os.getenv("GLOBUS_CLIENT_SECRET") + scopes = "urn:globus:auth:scope:transfer.api.globus.org:all" + + # If we have both client ID and secret, use confidential client + if client_id and client_secret: + logger.info("Using confidential client credentials") + confidential_client = globus_sdk.ConfidentialAppAuthClient(client_id, client_secret) + authorizer = globus_sdk.ClientCredentialsAuthorizer(confidential_client, scopes) + return globus_sdk.TransferClient(authorizer=authorizer) + + # Otherwise, use native app auth with browser login + native_client_id = client_id or "61338d24-54d5-408f-a10d-66c06b59f6d2" # Default Globus native client ID + client = globus_sdk.NativeAppAuthClient(native_client_id) + + # Check for cached tokens + if TOKEN_FILE.exists(): + try: + with open(TOKEN_FILE) as f: + tokens = json.load(f) + transfer_tokens = tokens.get("transfer.api.globus.org", {}) + if transfer_tokens.get("refresh_token"): + logger.info("Using cached tokens") + authorizer = globus_sdk.RefreshTokenAuthorizer( + transfer_tokens["refresh_token"], + client, + access_token=transfer_tokens.get("access_token"), + expires_at=transfer_tokens.get("expires_at_seconds"), + on_refresh=_save_tokens, + ) + return globus_sdk.TransferClient(authorizer=authorizer) + except (json.JSONDecodeError, KeyError) as e: + logger.warning(f"Could not load cached tokens: {e}") + + # No valid cached tokens, do browser login + logger.info("No cached tokens, using browser login") + client.oauth2_start_flow(refresh_tokens=True, requested_scopes=scopes) + authorize_url = client.oauth2_get_authorize_url() + + print(f"\nPlease visit this URL to authenticate:\n\n{authorize_url}\n") + auth_code = input("Enter the authorization code: ").strip() + + token_response = client.oauth2_exchange_code_for_tokens(auth_code) + _save_tokens(token_response) + + transfer_tokens = token_response.by_resource_server["transfer.api.globus.org"] + authorizer = globus_sdk.RefreshTokenAuthorizer( + transfer_tokens["refresh_token"], + client, + access_token=transfer_tokens["access_token"], + expires_at=transfer_tokens["expires_at_seconds"], + on_refresh=_save_tokens, + ) + return globus_sdk.TransferClient(authorizer=authorizer) + + +def _save_tokens(token_response) -> None: + """ + Save tokens to file for reuse. + + :param token_response: The token response from Globus SDK + :returns: None + """ + if hasattr(token_response, "by_resource_server"): + tokens = token_response.by_resource_server + else: + tokens = token_response + with open(TOKEN_FILE, "w") as f: + json.dump(tokens, f) + TOKEN_FILE.chmod(0o600) + + +def _looks_like_uuid(val: str) -> bool: + try: + uuid.UUID(val) + return True + except ValueError: + return False + + +def resolve_endpoint_uuid(name_or_uuid: str) -> str: + """ + Resolve an endpoint name from config.yml to its UUID. + If it looks like a UUID already, return as-is. + + :param name_or_uuid: Endpoint name or UUID + :returns: Endpoint UUID + :raises ValueError: If name not found in config.yml + """ + # If it contains dashes and is long, assume it's already a UUID + if _looks_like_uuid(name_or_uuid): + return name_or_uuid + + # Otherwise, look up in config + config = get_config() + endpoints = build_endpoints(config) + if name_or_uuid not in endpoints: + available = ", ".join(sorted(endpoints.keys())) + raise ValueError( + f"Endpoint '{name_or_uuid}' not found in config.yml. " + f"Available endpoints: {available}" + ) + return endpoints[name_or_uuid].uuid + + +def list_files_recursive( + tc: globus_sdk.TransferClient, + endpoint_uuid: str, + path: str, + _relative_base: str = "", +) -> Set[str]: + """ + Recursively list all files on an endpoint, returning relative paths. + + :param tc: Globus TransferClient + :param endpoint_uuid: The endpoint UUID + :param path: Absolute path on the endpoint to scan + :param _relative_base: Internal use for building relative paths + + :returns: Set of relative file paths (relative to the initial path) + """ + files = set() + try: + contents = tc.operation_ls(endpoint_uuid, path=path) + for obj in contents: + rel_path = f"{_relative_base}/{obj['name']}" if _relative_base else obj["name"] + + if obj["type"] == "file": + files.add(rel_path) + elif obj["type"] == "dir": + subdir_path = f"{path.rstrip('/')}/{obj['name']}" + files.update( + list_files_recursive(tc, endpoint_uuid, subdir_path, rel_path) + ) + except globus_sdk.GlobusAPIError as err: + logger.error(f"Error listing {path}: {err.message}") + + return files + + +def print_endpoints() -> None: + """ + List all endpoints defined in config.yml. + + :returns: None + """ + config = get_config() + endpoints = build_endpoints(config) + + print(f"\n{'Endpoint Name':<30} {'UUID':<40} {'Root Path'}") + print("-" * 100) + for name, ep in sorted(endpoints.items()): + print(f"{name:<30} {ep.uuid:<40} {ep.root_path}") + + +def main( + source: Optional[str] = typer.Option( + None, "--source", "-s", help="Source endpoint name from config.yml" + ), + source_uuid: Optional[str] = typer.Option( + None, "--source-uuid", help="Source endpoint UUID (alternative to --source)" + ), + source_path: Optional[str] = typer.Option( + None, "--source-path", help="Path on source endpoint" + ), + dest: Optional[str] = typer.Option( + None, "--dest", "-d", help="Destination endpoint name from config.yml" + ), + dest_uuid: Optional[str] = typer.Option( + None, "--dest-uuid", help="Destination endpoint UUID (alternative to --dest)" + ), + dest_path: Optional[str] = typer.Option( + None, "--dest-path", help="Path on destination endpoint" + ), + output_file: Optional[str] = typer.Option( + None, "--output", "-o", help="Write missing files to this file (one per line)" + ), + show_matching: bool = typer.Option( + False, "--show-matching", "-m", help="Also print files that exist on both endpoints" + ), + list_endpoints: bool = typer.Option( + False, "--list-endpoints", help="List available endpoints from config.yml and exit" + ), + logout: bool = typer.Option( + False, "--logout", help="Remove cached tokens and exit" + ), + verbose: bool = typer.Option( + False, "--verbose", "-v", help="Show detailed logging output" + ), +) -> Optional[List[str]]: + """ + Compare files between source and destination Globus endpoints. + + Reports files that exist on source but are missing from destination. + + Authentication: Uses GLOBUS_CLIENT_ID/GLOBUS_CLIENT_SECRET if both are set, + otherwise uses cached tokens or prompts for browser login. + + :param source: Source endpoint name from config.yml + :param source_uuid: Source endpoint UUID (alternative to --source) + :param source_path: Path on source endpoint + :param dest: Destination endpoint name from config.yml + :param dest_uuid: Destination endpoint UUID (alternative to --dest) + :param dest_path: Path on destination endpoint + :param output_file: Write missing files to this file (one per line) + :param show_matching: Also print files that exist on both endpoints + :param list_endpoints: List available endpoints from config.yml and exit + :param logout: Remove cached tokens and exit + :returns: List of missing file paths, or None if listing endpoints or logging out + """ + # Setup logging + log_level = logging.INFO if verbose else logging.WARNING + logging.basicConfig(level=log_level, format="%(levelname)s: %(message)s", force=True) + + # Handle --logout flag + if logout: + if TOKEN_FILE.exists(): + TOKEN_FILE.unlink() + print("Logged out (removed cached tokens)") + else: + print("No cached tokens to remove") + return None + + # Handle --list-endpoints flag + if list_endpoints: + print_endpoints() + return None + + # Validate required options for comparison + if not source_path: + raise typer.BadParameter("--source-path is required") + if not dest_path: + raise typer.BadParameter("--dest-path is required") + + # Resolve endpoint UUIDs + if source: + src_uuid = resolve_endpoint_uuid(source) + elif source_uuid: + src_uuid = source_uuid + else: + raise typer.BadParameter("Either --source or --source-uuid is required") + + if dest: + dst_uuid = resolve_endpoint_uuid(dest) + elif dest_uuid: + dst_uuid = dest_uuid + else: + raise typer.BadParameter("Either --dest or --dest-uuid is required") + + # Initialize transfer client + tc = get_transfer_client() + + # Ensure paths start with / (Globus prepends /~/ to relative paths) + if not source_path.startswith("/"): + source_path = "/" + source_path + if not dest_path.startswith("/"): + dest_path = "/" + dest_path + + # List files on both endpoints + logger.info(f"Scanning source: {source or src_uuid} at {source_path}") + source_files = list_files_recursive(tc, src_uuid, source_path) + logger.info(f"Found {len(source_files)} files on source") + + logger.info(f"Scanning destination: {dest or dst_uuid} at {dest_path}") + dest_files = list_files_recursive(tc, dst_uuid, dest_path) + logger.info(f"Found {len(dest_files)} files on destination") + + # Find missing and matching files + missing = sorted(source_files - dest_files) + matching = sorted(source_files & dest_files) + + # Report results + print(f"\n{'=' * 60}") + print(f"Source: {source or src_uuid} ({source_path})") + print(f"Destination: {dest or dst_uuid} ({dest_path})") + print(f"{'=' * 60}") + print(f"Files on source: {len(source_files)}") + print(f"Files on destination: {len(dest_files)}") + print(f"Missing from dest: {len(missing)}") + print(f"{'=' * 60}") + + if show_matching and matching: + print("\nMatching files:") + for f in matching: + print(f" ✓ {f}") + + if missing: + print("\nMissing files:") + for f in missing: + print(f" {f}") + + if output_file: + with open(output_file, "w") as fp: + fp.write("\n".join(missing)) + print(f"\nWrote {len(missing)} paths to {output_file}") + else: + print("\n✓ All files are synced! No missing files found.") + + return missing + + +if __name__ == "__main__": + typer.run(main) diff --git a/scripts/check_prefect_server_statuses.py b/scripts/check_prefect_server_statuses.py new file mode 100644 index 00000000..5c76ec16 --- /dev/null +++ b/scripts/check_prefect_server_statuses.py @@ -0,0 +1,548 @@ +""" +Prefect Server Status Checker + +Checks the health and run status of multiple self-hosted Prefect servers, +displaying deployment-level summaries with failure rates and recent failure times. + +Environment Variables: + PREFECT_API_KEY API key for splash_auth servers (BL7011, BL832) + KC_USERNAME Keycloak username for keycloak-protected servers + KC_PASSWORD Keycloak password for keycloak-protected servers +These can be set in a .env file in the same directory. + +Usage: + python prefect_status.py # Last 24 hours (default) + python prefect_status.py --hours 168 # Last 7 days + python prefect_status.py -H 72 # Last 3 days + python prefect_status.py -f # List failed flow run names +""" + +import argparse +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from dotenv import load_dotenv +from enum import Enum +import httpx +import os + + +load_dotenv() + + +# ANSI color codes +class Color: + """ + ANSI color codes for terminal output. + """ + RED = "\033[91m" + GREEN = "\033[92m" + YELLOW = "\033[93m" + BLUE = "\033[94m" + DIM = "\033[2m" + BOLD = "\033[1m" + RESET = "\033[0m" + + +class AuthType(Enum): + """ + Enumeration of authentication types. + """ + SPLASH_AUTH = "splash_auth" + KEYCLOAK = "keycloak" + + +class PrefectServer(Enum): + """ + Enumeration of Prefect servers with their URLs and auth types. + """ + dichroism = ("https://flow-dichroism.als.lbl.gov", AuthType.KEYCLOAK) + BL7011 = ("https://flow-xpcs.als.lbl.gov", AuthType.SPLASH_AUTH) + BL733 = ("https://flow-733.als.lbl.gov", AuthType.KEYCLOAK) + BL832 = ("https://flow-prd.als.lbl.gov", AuthType.SPLASH_AUTH) + BL931 = ("https://flow-931.als.lbl.gov", AuthType.KEYCLOAK) + + @property + def url(self) -> str: + return self.value[0] + + @property + def auth_type(self) -> AuthType: + return self.value[1] + + +class StateType(Enum): + """ + Enumeration of possible run states. + """ + SCHEDULED = "SCHEDULED" + PENDING = "PENDING" + RUNNING = "RUNNING" + COMPLETED = "COMPLETED" + FAILED = "FAILED" + CANCELLED = "CANCELLED" + CANCELLING = "CANCELLING" + CRASHED = "CRASHED" + PAUSED = "PAUSED" + + +BAD_STATES = {StateType.FAILED, StateType.CRASHED, StateType.CANCELLED} + + +@dataclass +class FailedRun: + """ + Info about a failed flow run. + + :param name: Name of the flow run. + :param state: State of the flow run. + :param time: Datetime of the run end or start. + :param message: Error message, if any. + """ + name: str + state: str + time: datetime | None + message: str | None = None + + +@dataclass +class DeploymentSummary: + """ + Summary of a deployment's run statuses. + + :param name: Name of the deployment. + :param counts: Dictionary mapping StateType to count of runs. + :param last_failure_time: Datetime of the last failure, if any. + """ + name: str + counts: dict[StateType, int] = field(default_factory=dict) + last_failure_time: datetime | None = None + failed_runs: list[FailedRun] = field(default_factory=list) + + @property + def total(self) -> int: + """ + Total number of runs. + + :return: Total count of runs. + """ + return sum(self.counts.values()) + + @property + def failure_count(self) -> int: + """ + Number of failed runs. + + :return: Count of failed runs. + """ + return sum(self.counts.get(s, 0) for s in BAD_STATES) + + @property + def healthy(self) -> bool: + """ + Whether the deployment is healthy (no failures). + + :return: True if healthy, False otherwise. + """ + return self.failure_count == 0 + + def format_time_ago(self, dt: datetime) -> str: + """ + Format a datetime as 'X hours/days ago'. + + :param dt: Datetime to format. + :return: Formatted string. + """ + delta = datetime.now(timezone.utc) - dt + hours = delta.total_seconds() / 3600 + if hours < 1: + minutes = int(delta.total_seconds() / 60) + return f"{minutes}m ago" + elif hours < 24: + return f"{int(hours)}h ago" + else: + days = int(hours / 24) + return f"{days}d ago" + + def __str__(self) -> str: + """ + String representation of the deployment summary. + + :return: Formatted string. + """ + if self.total == 0: + return f"{self.name}: {Color.DIM}no runs{Color.RESET}" + + parts = [] + for state, count in self.counts.items(): + if count > 0: + if state in BAD_STATES: + pct = count / self.total * 100 + parts.append(f"{Color.RED}{state.value.lower()}: {count} ({pct:.1f}%){Color.RESET}") + elif state == StateType.COMPLETED: + parts.append(f"{Color.GREEN}{state.value.lower()}: {count}{Color.RESET}") + elif state == StateType.RUNNING: + parts.append(f"{Color.BLUE}{state.value.lower()}: {count}{Color.RESET}") + elif state == StateType.SCHEDULED: + parts.append(f"{Color.YELLOW}{state.value.lower()}: {count}{Color.RESET}") + elif state == StateType.PENDING: + parts.append(f"{Color.YELLOW}{state.value.lower()}: {count}{Color.RESET}") + elif state == StateType.PAUSED: + parts.append(f"{Color.YELLOW}{state.value.lower()}: {count}{Color.RESET}") + else: + parts.append(f"{state.value.lower()}: {count}") + + result = f"{self.name}: {', '.join(parts)}" + + if self.last_failure_time: + result += f" {Color.DIM}(last failure: {self.format_time_ago(self.last_failure_time)}){Color.RESET}" + + return result + + +@dataclass +class ServerSummary: + """ + Summary of a Prefect server's health and deployment statuses. + + :param server: PrefectServer instance. + :param healthy: Whether the server is considered healthy. + :param reachable: Whether the server is reachable. + :param auth_ok: Whether authentication is successful. + :param deployments: List of DeploymentSummary instances. + """ + server: PrefectServer + healthy: bool + reachable: bool + auth_ok: bool + deployments: list[DeploymentSummary] = field(default_factory=list) + error: str | None = None + + @property + def total_runs(self) -> int: + """ + Total number of runs across all deployments. + + :return: Total run count. + """ + return sum(d.total for d in self.deployments) + + @property + def total_failures(self) -> int: + """ + Total number of failed runs across all deployments. + + :return: Total failure count. + """ + return sum(d.failure_count for d in self.deployments) + + +def get_keycloak_token() -> str: + """ + Fetch a fresh Keycloak access token. + + :return: Access token string. + """ + resp = httpx.post( + "https://comp-auth.als.lbl.gov/realms/als-computing/protocol/openid-connect/token", + data={ + "client_id": "als-computing-api", + "grant_type": "password", + "username": os.environ["KC_USERNAME"], + "password": os.environ["KC_PASSWORD"], + "scope": "openid email profile", + }, + ) + resp.raise_for_status() + return resp.json()["access_token"] + + +def get_client(server: PrefectServer) -> httpx.Client: + """ + Create an HTTP client with the appropriate auth for the server. + + :param server: PrefectServer to connect to. + :return: Configured httpx.Client. + """ + if server.auth_type == AuthType.SPLASH_AUTH: + token = os.environ.get("PREFECT_API_KEY") + if not token: + raise ValueError("PREFECT_API_KEY not set in environment") + else: + token = get_keycloak_token() + + return httpx.Client( + headers={"Authorization": f"Bearer {token}"}, + timeout=10, + ) + + +def format_hours(hours: int) -> str: + """ + Format hours as a human-readable string. + + :param hours: Number of hours. + :return: Formatted string. + """ + if hours < 24: + return f"last {hours}h" + elif hours % 24 == 0: + days = hours // 24 + return f"last {days}d" + else: + days = hours / 24 + return f"last {days:.1f}d" + + +def check_server_health(server: PrefectServer) -> tuple[bool, bool]: + """ + Check if a Prefect server is reachable. Returns (reachable, auth_ok). + + :param server: PrefectServer to check. + :return: Tuple indicating if the server is reachable and if auth is OK. + """ + try: + with get_client(server) as client: + resp = client.get(f"{server.url}/api/health", follow_redirects=False) + + if resp.status_code == 200: + return True, True + elif resp.status_code in (302, 307): + return True, False + else: + return False, False + except httpx.RequestError: + return False, False + + +def get_deployment_summaries(server: PrefectServer, hours: int = 24) -> list[DeploymentSummary]: + """ + Get run summary per deployment for the last N hours. + + :param server: PrefectServer to query. + :param hours: Number of hours to look back. + :return: List of DeploymentSummary objects. + """ + since = datetime.now(timezone.utc) - timedelta(hours=hours) + + with get_client(server) as client: + resp = client.post(f"{server.url}/api/deployments/filter", json={"limit": 200}) + resp.raise_for_status() + deployments = resp.json() + + # Paginate through all flow runs with progress + all_runs = [] + offset = 0 + print(f" {Color.DIM}fetching runs...{Color.RESET}", end="", flush=True) + while True: + resp = client.post( + f"{server.url}/api/flow_runs/filter", + json={ + "flow_runs": { + "start_time": {"after_": since.isoformat()} + }, + "limit": 200, + "offset": offset, + }, + ) + resp.raise_for_status() + batch = resp.json() + if not batch: + break + all_runs.extend(batch) + print(f"\r {Color.DIM}fetching runs... {len(all_runs)}{Color.RESET}", end="", flush=True) + if len(batch) < 200: + break + offset += 200 + print(f"\r {Color.DIM}fetched {len(all_runs)} runs{Color.RESET} ") + + # Group runs by deployment + runs_by_deployment: dict[str, list[dict]] = {} + for run in all_runs: + dep_id = run.get("deployment_id") + if dep_id: + runs_by_deployment.setdefault(dep_id, []).append(run) + + summaries = [] + for dep in deployments: + dep_id = dep["id"] + dep_name = dep["name"] + runs = runs_by_deployment.get(dep_id, []) + + counts = {} + last_failure_time = None + failed_runs = [] + + for state_type in StateType: + matching_runs = [r for r in runs if r["state_type"] == state_type.value] + count = len(matching_runs) + if count > 0: + counts[state_type] = count + + # Track failed runs and most recent failure time + if state_type in BAD_STATES: + for r in matching_runs: + end_time = r.get("end_time") or r.get("start_time") + run_time = None + if end_time: + try: + run_time = datetime.fromisoformat(end_time) + if last_failure_time is None or run_time > last_failure_time: + last_failure_time = run_time + except (ValueError, TypeError): + pass + + # Extract error message from state + state_obj = r.get("state", {}) + message = state_obj.get("message") if isinstance(state_obj, dict) else None + + failed_runs.append(FailedRun( + name=r.get("name", "unknown"), + state=state_type.value, + time=run_time, + message=message, + )) + + # Sort failed runs by time (most recent first) + failed_runs.sort(key=lambda r: r.time or datetime.min.replace(tzinfo=timezone.utc), reverse=True) + + summary = DeploymentSummary(dep_name, counts, last_failure_time, failed_runs) + summaries.append(summary) + + # Sort: unhealthy first, then by failure count descending + summaries.sort(key=lambda s: (-int(not s.healthy), -s.failure_count, s.name)) + + return summaries + + +def get_server_summary(server: PrefectServer, hours: int = 24, show_failed: bool = False) -> ServerSummary: + """ + Get full summary for a server. + + :param server: PrefectServer to check. + :param hours: Number of hours to look back for run data. + :param show_failed: Whether to list failed flow run names. + :return: ServerSummary object. + """ + print(f"\n{Color.BOLD}{server.name}{Color.RESET} ({server.url}) [{format_hours(hours)}]") + print("-" * 50) + + reachable, auth_ok = check_server_health(server) + + if not reachable: + print(f" {Color.RED}✗ unreachable{Color.RESET}") + return ServerSummary(server, healthy=False, reachable=False, auth_ok=False) + + if not auth_ok: + print(f" {Color.YELLOW}⚠ auth rejected{Color.RESET}") + return ServerSummary(server, healthy=False, reachable=True, auth_ok=False) + + try: + deployments = get_deployment_summaries(server, hours) + + # Server summary line + total_runs = sum(d.total for d in deployments) + total_failures = sum(d.failure_count for d in deployments) + unhealthy_count = sum(1 for d in deployments if not d.healthy) + + if total_failures > 0: + failure_pct = total_failures / total_runs * 100 if total_runs > 0 else 0 + print(f" {Color.RED}{total_runs} runs, {total_failures} failures ({failure_pct:.1f}%), " + f"{unhealthy_count} unhealthy deployments{Color.RESET}") + else: + print(f" {Color.GREEN}{total_runs} runs, all healthy{Color.RESET}") + + # Per-deployment breakdown + for summary in deployments: + status = f"{Color.GREEN}✓{Color.RESET}" if summary.healthy else f"{Color.RED}✗{Color.RESET}" + print(f" {status} {summary}") + + # Show failed run names if requested + if show_failed and summary.failed_runs: + for run in summary.failed_runs: + time_str = f" ({summary.format_time_ago(run.time)})" if run.time else "" + print(f" {Color.DIM}└ {run.name} [{run.state.lower()}]{time_str}{Color.RESET}") + if run.message: + # Indent each line of the message + for line in run.message.splitlines(): + print(f" {Color.DIM}{line}{Color.RESET}") + + healthy = all(d.healthy for d in deployments) + return ServerSummary(server, healthy=healthy, reachable=True, auth_ok=True, deployments=deployments) + + except httpx.HTTPStatusError as e: + error_msg = f"{e.response.status_code} - {e.response.text[:100]}" + print(f" {Color.RED}✗ Error: {error_msg}{Color.RESET}") + return ServerSummary(server, healthy=False, reachable=True, auth_ok=True, error=error_msg) + + +def get_all_servers_summary(hours: int = 24, show_failed: bool = False) -> dict[PrefectServer, ServerSummary]: + """ + Get summaries for all configured Prefect servers. + + :param hours: Number of hours to look back for run data. + :param show_failed: Whether to list failed flow run names. + :return: Dictionary mapping PrefectServer to ServerSummary. + """ + summaries = {} + for server in PrefectServer: + summaries[server] = get_server_summary(server, hours, show_failed) + + # Overall summary + print(f"\n{'=' * 50}") + print(f"{Color.BOLD}OVERALL SUMMARY{Color.RESET}") + print("=" * 50) + + total_servers = len(summaries) + healthy_servers = sum(1 for s in summaries.values() if s.healthy) + unreachable = sum(1 for s in summaries.values() if not s.reachable) + auth_issues = sum(1 for s in summaries.values() if s.reachable and not s.auth_ok) + + total_runs = sum(s.total_runs for s in summaries.values()) + total_failures = sum(s.total_failures for s in summaries.values()) + failure_pct = total_failures / total_runs * 100 if total_runs > 0 else 0 + + # Server status + status_parts = [f"{healthy_servers}/{total_servers} servers healthy"] + if unreachable: + status_parts.append(f"{Color.RED}{unreachable} unreachable{Color.RESET}") + if auth_issues: + status_parts.append(f"{Color.YELLOW}{auth_issues} auth issues{Color.RESET}") + print(", ".join(status_parts)) + + # Run stats + if total_runs > 0: + if total_failures > 0: + print(f"{total_runs:,} total runs, {Color.RED}{total_failures:,} failures ({failure_pct:.1f}%){Color.RESET}") + else: + print(f"{Color.GREEN}{total_runs:,} total runs, all successful{Color.RESET}") + + return summaries + + +def main() -> None: + """ + Main function to parse arguments and run the status check. + + :param hours: Number of hours to look back. (--hours or -H) + :param show_failed: Whether to list failed flow run names. (--show-failed or -f) + :return: None + """ + parser = argparse.ArgumentParser(description="Check Prefect server status") + parser.add_argument( + "--hours", "-H", + type=int, + default=24, + help="Number of hours to look back (default: 24, i.e. 1 day)" + ) + parser.add_argument( + "--show-failed", "-f", + action="store_true", + help="List failed flow run names under each unhealthy deployment" + ) + args = parser.parse_args() + + get_all_servers_summary(hours=args.hours, show_failed=args.show_failed) + + +if __name__ == "__main__": + main()