Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion bedrock_keys_security/commands/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
@click.command()
@click.option('--json', 'output_json', is_flag=True, help='Output results as JSON')
@click.option('--csv', 'csv_file', default=None, metavar='FILE', help='Export results to CSV file')
@click.option('--verbose', '-v', is_flag=True, help='Enable verbose log output during scan')
@click.pass_context
def scan(ctx, output_json, csv_file):
def scan(ctx, output_json, csv_file, verbose):
"""Scan for phantom IAM users (default command)"""
# Merge: scan-level --verbose enables verbose on the context
if verbose:
ctx.obj.verbose = True

scanner = ctx.obj.scanner

if not output_json:
Expand Down
139 changes: 113 additions & 26 deletions bedrock_keys_security/core/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def check_credentials(self, username: str) -> Dict:
active_creds = [c for c in credentials if c['Status'] == 'Active']

if self.verbose and active_creds:
click.echo(output.yellow(f" [CRED] {username}: {len(active_creds)} active Bedrock API key(s)"))
key_word = "key" if len(active_creds) == 1 else "keys"
output.warning(f"{username}: {len(active_creds)} active Bedrock API {key_word}")

return {
'bedrock_credentials': len(credentials),
Expand Down Expand Up @@ -112,7 +113,8 @@ def check_access_keys(self, username: str) -> Dict:
active_keys = [k for k in access_keys if k['Status'] == 'Active']

if active_keys and self.verbose:
output.high_risk(f"{username}: {len(active_keys)} IAM access key(s) found (AT RISK)")
key_word = "key" if len(active_keys) == 1 else "keys"
output.high_risk(f"{username}: {len(active_keys)} IAM access {key_word} found (AT RISK)")

return {
'access_keys': len(access_keys),
Expand Down Expand Up @@ -176,15 +178,15 @@ def delete_phantom_user(self, username: str, dry_run: bool = False) -> bool:

access_keys = self.iam.list_access_keys(UserName=username)['AccessKeyMetadata']
if access_keys:
click.echo(output.yellow(f" - Would delete {len(access_keys)} access key(s)"))
click.echo(output.yellow(f" - Would delete {len(access_keys)} access {'key' if len(access_keys) == 1 else 'keys'}"))

service_creds = self.iam.list_service_specific_credentials(UserName=username)['ServiceSpecificCredentials']
if service_creds:
click.echo(output.yellow(f" - Would delete {len(service_creds)} Bedrock API key(s)"))
click.echo(output.yellow(f" - Would delete {len(service_creds)} Bedrock API {'key' if len(service_creds) == 1 else 'keys'}"))

attached = self.iam.list_attached_user_policies(UserName=username)['AttachedPolicies']
if attached:
click.echo(output.yellow(f" - Would detach {len(attached)} managed polic(y/ies)"))
click.echo(output.yellow(f" - Would detach {len(attached)} managed {'policy' if len(attached) == 1 else 'policies'}"))

inline = self.iam.list_user_policies(UserName=username)['PolicyNames']
if inline:
Expand Down Expand Up @@ -266,7 +268,7 @@ def cleanup_orphaned_users(self, phantoms: List[Dict], dry_run: bool = False, fo

# Confirmation prompt (unless forced or dry-run)
if not dry_run and not force:
if not click.confirm(click.style(f"Delete {len(orphaned_users)} orphaned phantom user(s)?", fg="yellow"), default=False):
if not click.confirm(click.style(f"Delete {len(orphaned_users)} orphaned phantom {'user' if len(orphaned_users) == 1 else 'users'}?", fg="yellow"), default=False):
output.info("Cleanup cancelled by user.")
return {'total': len(orphaned_users), 'deleted': 0, 'failed': 0}

Expand Down Expand Up @@ -380,7 +382,7 @@ def generate_timeline(self, username: str, days: int = 7) -> None:
click.echo(output.yellow(f"No CloudTrail events found for {username}") + "\n")
return

click.echo(f"{output.bold(f'Found {len(events)} event(s):')}\n")
click.echo(f"{output.bold(f'Found {len(events)} {"event" if len(events) == 1 else "events"}:')}\n")

for event in events:
event_data = json.loads(event['CloudTrailEvent'])
Expand Down Expand Up @@ -449,7 +451,7 @@ def generate_incident_report(self, username: str, output_file: Optional[str] = N
access_keys = self.iam.list_access_keys(UserName=username)['AccessKeyMetadata']

if access_keys:
report_lines.append(f" \u26a0\ufe0f WARNING: {len(access_keys)} IAM access key(s) found!")
report_lines.append(f" \u26a0\ufe0f WARNING: {len(access_keys)} IAM access {'key' if len(access_keys) == 1 else 'keys'} found!")
for key in access_keys:
report_lines.append(f" Key ID: {key['AccessKeyId']}")
report_lines.append(f" Status: {key['Status']}")
Expand Down Expand Up @@ -517,6 +519,35 @@ def report_header(self) -> str:
lines.append(f"Region: {self.region}")
return '\n'.join(lines)

def _format_summary(self, phantoms: List[Dict], total: int, active: int, orphaned: int, at_risk: int) -> List[str]:
"""Format the summary block shared between report methods"""
lines = []
lines.append(f"\n{output.bold('Summary:')}")
lines.append(f" Total phantom users: {output.cyan(str(total))}")
lines.append(f" Active: {output.green(str(active))}")
lines.append(f" Orphaned: {output.yellow(str(orphaned))} (safe to cleanup)")
lines.append(f" At Risk: {output.red(str(at_risk))} (IAM access keys found)")

if at_risk > 0:
lines.append(f"\n{click.style('AT RISK users detected:', fg='red', bold=True)}")
lines.append(output.red("These phantom users have IAM access keys (AKIA...) attached."))
lines.append(output.red("These keys grant bedrock:*, iam:ListRoles, kms:DescribeKey,"))
lines.append(output.red("ec2:Describe* and persist even if the API key is revoked. Investigate:"))
for user in phantoms:
if user['status'] == 'AT RISK':
n = user['active_access_keys']
key_label = "access key" if n == 1 else "access keys"
lines.append(output.red(f" - {user['username']} ({n} {key_label})"))
lines.append("")

if orphaned > 0:
lines.append(f"\n{output.yellow(f'{orphaned} orphaned phantom users can be cleaned up.')}")
lines.append(output.yellow("These users have no active credentials and can be safely deleted to reduce your attack surface."))
lines.append(output.yellow("Run: bks cleanup --dry-run to preview, or cleanup to delete."))
lines.append("")

return lines

def generate_table_report(self, phantoms: List[Dict]) -> str:
"""Generate formatted table report"""
if not phantoms:
Expand All @@ -543,28 +574,84 @@ def generate_table_report(self, phantoms: List[Dict]) -> str:
headers = ['Username', 'Created', 'Active API Keys', 'Access Keys', 'Status']
lines.append(tabulate(table_data, headers=headers, tablefmt='grid'))

lines.append(f"\n{output.bold('Summary:')}")
lines.append(f" Total phantom users: {output.cyan(str(total))}")
lines.append(f" Active: {output.green(str(active))}")
lines.append(f" Orphaned: {output.yellow(str(orphaned))} (safe to cleanup)")
lines.append(f" At Risk: {output.red(str(at_risk))} (IAM access keys found)")
lines.extend(self._format_summary(phantoms, total, active, orphaned, at_risk))

if at_risk > 0:
lines.append(f"\n{click.style('AT RISK users detected:', fg='red', bold=True)}")
lines.append(output.red("These phantom users have IAM access keys (AKIA...) attached."))
lines.append(output.red("These keys grant bedrock:*, iam:ListRoles, kms:DescribeKey,"))
lines.append(output.red("ec2:Describe* and persist even if the API key is revoked. Investigate:"))
for user in phantoms:
if user['status'] == 'AT RISK':
lines.append(output.red(f" - {user['username']} ({user['active_access_keys']} access keys)"))
lines.append("")
return '\n'.join(lines)

def generate_verbose_table_report(self, phantoms: List[Dict]) -> str:
"""Generate verbose report with detailed per-user information"""
if not phantoms:
return f"\n{output.green('No phantom users found in this account.')}\n"

total = len(phantoms)
active = len([u for u in phantoms if u['status'] == 'ACTIVE'])
orphaned = len([u for u in phantoms if u['status'] == 'ORPHANED'])
at_risk = len([u for u in phantoms if u['status'] == 'AT RISK'])

lines = []
lines.append(f"\n{output.bold(f'Found {total} phantom {"user" if total == 1 else "users"}')}\n")

for i, user in enumerate(phantoms):
status = output.style_status(user['status'])
lines.append(output.bold('─' * 60))
lines.append(f" {output.bold(output.cyan(user['username']))} [{status}]")
lines.append(output.bold('─' * 60))

# Identity
lines.append(f" User ID: {user['user_id']}")
lines.append(f" ARN: {user['arn']}")
created = user['created']
if hasattr(created, 'strftime'):
lines.append(f" Created: {created.strftime('%Y-%m-%d %H:%M:%S UTC')}")
else:
lines.append(f" Created: {created}")
lines.append(f" Path: {user.get('path', '/')}")

# Bedrock credentials
active_creds = user.get('active_bedrock_credentials', 0)
total_creds = user.get('bedrock_credentials', 0)
cred_color = output.green if active_creds == 0 else output.yellow
lines.append(f"\n Bedrock API Keys: {cred_color(f'{active_creds} active')} / {total_creds} total")

for cred in user.get('credential_details', []):
cred_id = cred.get('ServiceSpecificCredentialId', 'N/A')
cred_status = cred.get('Status', 'N/A')
cred_created = cred.get('CreateDate', '')
if hasattr(cred_created, 'strftime'):
cred_created = cred_created.strftime('%Y-%m-%d %H:%M:%S UTC')
lines.append(f" • {cred_id} status={cred_status} created={cred_created}")

# Access keys
active_ak = user.get('active_access_keys', 0)
total_ak = user.get('access_keys', 0)
ak_color = output.green if active_ak == 0 else output.red
lines.append(f"\n IAM Access Keys: {ak_color(f'{active_ak} active')} / {total_ak} total")

for key_id in user.get('access_key_ids', []):
lines.append(f" • {output.red(key_id)}")

# Policies
attached = user.get('attached_policies', [])
inline = user.get('inline_policies', [])
lines.append(f"\n Policies: {user.get('total_policies', 0)} total")

if attached:
lines.append(" Managed:")
for p in attached:
lines.append(f" • {p}")
if inline:
lines.append(" Inline:")
for p in inline:
lines.append(f" • {p}")
if not attached and not inline:
lines.append(" (none)")

if orphaned > 0:
lines.append(f"\n{output.yellow(f'{orphaned} orphaned phantom users can be cleaned up.')}")
lines.append(output.yellow("These users have no active credentials and can be safely deleted to reduce your attack surface."))
lines.append(output.yellow("Run: bks cleanup --dry-run to preview, or cleanup to delete."))
lines.append("")

# Summary
lines.append(output.bold('═' * 60))
lines.extend(self._format_summary(phantoms, total, active, orphaned, at_risk))

return '\n'.join(lines)

def generate_json_report(self, phantoms: List[Dict]) -> str:
Expand Down
71 changes: 57 additions & 14 deletions bedrock_keys_security/utils/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,64 @@

import sys
import threading
import time
import click
from contextlib import contextmanager
from typing import Dict

_spinner_lock = threading.Lock()
_spinner_active = False
_spinner_label_len = 0
_spinner_label = ""


def _clear_spinner_line():
"""Clear the spinner line from stderr for clean log output"""
if _spinner_active:
sys.stderr.write("\r" + " " * (_spinner_label_len + 3) + "\r")
sys.stderr.flush()


def _redraw_spinner():
"""Redraw the spinner line on stderr after log output"""
if _spinner_active:
frame = click.style("\u280b", fg="cyan")
sys.stderr.write(f"\r{frame} {_spinner_label}")
sys.stderr.flush()


def info(msg: str) -> None:
click.echo(click.style(f"[INFO] {msg}", fg="cyan"))
with _spinner_lock:
_clear_spinner_line()
click.echo(click.style(f"[INFO] {msg}", fg="cyan"))
_redraw_spinner()


def success(msg: str) -> None:
click.echo(click.style(f"[SUCCESS] {msg}", fg="green"))
with _spinner_lock:
_clear_spinner_line()
click.echo(click.style(f"[SUCCESS] {msg}", fg="green"))
_redraw_spinner()


def warning(msg: str) -> None:
click.echo(click.style(f"[WARNING] {msg}", fg="yellow"))
with _spinner_lock:
_clear_spinner_line()
click.echo(click.style(f"[WARNING] {msg}", fg="yellow"))
_redraw_spinner()


def error(msg: str) -> None:
click.echo(click.style(f"[ERROR] {msg}", fg="red"), err=True)
with _spinner_lock:
_clear_spinner_line()
click.echo(click.style(f"[ERROR] {msg}", fg="red"), err=True)
_redraw_spinner()


def high_risk(msg: str) -> None:
click.echo(click.style(f"[HIGH RISK] {msg}", fg="red"))
with _spinner_lock:
_clear_spinner_line()
click.echo(click.style(f"[HIGH RISK] {msg}", fg="red"))
_redraw_spinner()


def bold(text: str) -> str:
Expand All @@ -51,27 +85,36 @@ def cyan(text: str) -> str:
@contextmanager
def spinner(label="Scanning"):
"""Simple threaded spinner for indeterminate progress"""
frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
global _spinner_active, _spinner_label_len, _spinner_label
frames = ["\u280b", "\u2819", "\u2839", "\u2838", "\u283c", "\u2834", "\u2826", "\u2827", "\u2807", "\u280f"]
stop = threading.Event()

def spin():
i = 0
while not stop.is_set():
frame = click.style(frames[i % len(frames)], fg="cyan")
sys.stderr.write(f"\r{frame} {label}")
sys.stderr.flush()
with _spinner_lock:
if not stop.is_set():
frame = click.style(frames[i % len(frames)], fg="cyan")
sys.stderr.write(f"\r{frame} {label}")
sys.stderr.flush()
i += 1
stop.wait(0.08)
sys.stderr.write("\r" + " " * (len(label) + 3) + "\r")
sys.stderr.flush()

with _spinner_lock:
_spinner_active = True
_spinner_label_len = len(label)
_spinner_label = label
t = threading.Thread(target=spin, daemon=True)
t.start()
try:
yield
finally:
stop.set()
t.join()
with _spinner_lock:
_spinner_active = False


def style_status(status: str) -> str:
Expand All @@ -88,9 +131,9 @@ def format_decode_table_output(result: Dict) -> str:
return f"\n{red('[ERROR] ' + result['error'])}\n"

lines = []
lines.append(f"\n{bold('' * 60)}")
lines.append(f"\n{bold('\u2500' * 60)}")
lines.append(f"{bold(cyan(' Bedrock API Key Analysis'))}")
lines.append(f"{bold('' * 60)}")
lines.append(f"{bold('\u2500' * 60)}")

key_type = result.get("type", "Unknown")
type_label = green("Long-term (ABSK)") if key_type == "long-term" else yellow("Short-term")
Expand All @@ -113,8 +156,8 @@ def format_decode_table_output(result: Dict) -> str:
if "security_notes" in result and result["security_notes"]:
lines.append(f"\n {bold(yellow('Security Notes:'))}")
for note in result["security_notes"]:
lines.append(f" {yellow(' ' + note)}")
lines.append(f" {yellow(' \u2022 ' + note)}")

lines.append(f"{bold('' * 60)}\n")
lines.append(f"{bold('\u2500' * 60)}\n")

return "\n".join(lines)
Loading