Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions strix/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class Config:
# Telemetry
strix_telemetry = "1"

# Webhook
strix_webhook_url: str | None = None
strix_webhook_format = "generic"

# Config file override (set via --config CLI arg)
_config_file_override: Path | None = None

Expand Down
30 changes: 29 additions & 1 deletion strix/interface/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
validate_config_file,
validate_llm_response,
)
from strix.interface.webhooks import send_completion_webhook # noqa: E402
from strix.runtime.docker_runtime import HOST_GATEWAY_HOSTNAME # noqa: E402
from strix.telemetry import posthog # noqa: E402
from strix.telemetry.tracer import get_global_tracer # noqa: E402
Expand Down Expand Up @@ -366,6 +367,23 @@ def parse_arguments() -> argparse.Namespace:
help="Path to a custom config file (JSON) to use instead of ~/.strix/cli-config.json",
)

parser.add_argument(
"--webhook-url",
type=str,
default=None,
help="URL to send scan results to on completion. "
"Supports generic JSON endpoints, Slack incoming webhooks, and Discord webhooks.",
)

parser.add_argument(
"--webhook-format",
type=str,
choices=["generic", "slack", "discord"],
default="generic",
help="Webhook payload format (default: generic). "
"Auto-detected from the URL when set to 'generic'.",
)

args = parser.parse_args()

if args.instruction and args.instruction_file:
Expand Down Expand Up @@ -520,7 +538,7 @@ def persist_config() -> None:
save_current_config()


def main() -> None:
def main() -> None: # noqa: PLR0912
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

Expand Down Expand Up @@ -578,6 +596,16 @@ def main() -> None:
results_path = Path("strix_runs") / args.run_name
display_completion_message(args, results_path)

webhook_url = args.webhook_url or Config.get("strix_webhook_url")
if webhook_url:
webhook_format = args.webhook_format or Config.get("strix_webhook_format") or "generic"
send_completion_webhook(
webhook_url=webhook_url,
webhook_format=webhook_format,
tracer=tracer,
args=args,
)
Comment on lines +599 to +607
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tracer variable is obtained from get_global_tracer() in the finally block at line 592, which can return None. However, it's passed to send_completion_webhook without checking if it's None first. This could lead to AttributeError when the webhook formatters try to access tracer attributes like tracer.vulnerability_reports or tracer.agents. Add a check to ensure tracer is not None before calling send_completion_webhook.

Copilot uses AI. Check for mistakes.

if args.non_interactive:
tracer = get_global_tracer()
if tracer and tracer.vulnerability_reports:
Expand Down
291 changes: 291 additions & 0 deletions strix/interface/webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
"""Webhook dispatcher for scan completion notifications.

Sends scan results to external services (Slack, Discord, or generic JSON endpoints)
when a penetration test completes.
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse

import requests


if TYPE_CHECKING:
import argparse


logger = logging.getLogger(__name__)

WEBHOOK_TIMEOUT = 10

# Platform limits for field truncation
_SLACK_SECTION_TEXT_LIMIT = 3000
_DISCORD_FIELD_VALUE_LIMIT = 1024
_DISCORD_TITLE_LIMIT = 256


def _truncate(text: str, limit: int) -> str:
"""Truncate *text* to *limit* characters, appending an ellipsis if trimmed."""
if len(text) <= limit:
return text
return text[: limit - 1] + "\u2026"


def send_completion_webhook(
webhook_url: str,
webhook_format: str,
tracer: Any,
args: argparse.Namespace,
) -> None:
"""Send scan completion results to a webhook URL.

Args:
webhook_url: The destination webhook URL.
webhook_format: One of ``"generic"``, ``"slack"``, or ``"discord"``.
tracer: The global :class:`Tracer` instance containing scan results.
args: Parsed CLI arguments (used to extract target info and run name).
"""
# Validate URL scheme
parsed = urlparse(webhook_url)
if parsed.scheme not in ("http", "https"):
logger.warning("Invalid webhook URL scheme %r — skipping delivery", parsed.scheme)
return

if not tracer:
logger.warning("No tracer available — skipping webhook delivery")
return

resolved_format = _resolve_format(webhook_url, webhook_format)

formatters: dict[str, Any] = {
"generic": _format_generic,
"slack": _format_slack,
"discord": _format_discord,
}

formatter = formatters.get(resolved_format, _format_generic)
payload = formatter(tracer, args)

try:
response = requests.post(webhook_url, json=payload, timeout=WEBHOOK_TIMEOUT)
Comment on lines +72 to +73
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The webhook URL is not validated before being used with requests.post. While webhook URLs are configured by the user (not untrusted input), it would be more defensive to validate that the URL has a proper scheme (http/https) to prevent unexpected behavior. Consider adding URL validation to check the scheme before making the request.

Copilot uses AI. Check for mistakes.
response.raise_for_status()
logger.info(
"Webhook delivered successfully to %s (status %s)",
webhook_url,
response.status_code,
)
except requests.RequestException as exc:
logger.warning("Failed to deliver webhook to %s: %s", webhook_url, exc)


# ---------------------------------------------------------------------------
# Format resolution
# ---------------------------------------------------------------------------


def _resolve_format(url: str, explicit_format: str) -> str:
"""Auto-detect the webhook format from the URL when the user chose ``"generic"``."""
if explicit_format != "generic":
return explicit_format

host = urlparse(url).hostname or ""
if "hooks.slack.com" in host:
return "slack"
if "discord.com" in host or "discordapp.com" in host:
return "discord"

return "generic"


# ---------------------------------------------------------------------------
# Payload helpers
# ---------------------------------------------------------------------------


def _targets_summary(args: argparse.Namespace) -> str:
targets_info: list[dict[str, Any]] = getattr(args, "targets_info", [])
if not targets_info:
return "unknown"
return ", ".join(t.get("original", "unknown") for t in targets_info)


def _vulnerability_summary(tracer: Any) -> list[dict[str, Any]]:
"""Return a lightweight list of vulnerability dicts safe for JSON serialisation."""
if not tracer:
return []
return [
{
"id": report.get("id", ""),
"title": report.get("title", ""),
"severity": report.get("severity", ""),
"cvss": report.get("cvss"),
"target": report.get("target", ""),
"endpoint": report.get("endpoint", ""),
"description": report.get("description", ""),
}
for report in tracer.vulnerability_reports
]


def _severity_counts(tracer: Any) -> dict[str, int]:
counts: dict[str, int] = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0}
if not tracer:
return counts
for report in tracer.vulnerability_reports:
severity = report.get("severity", "").lower()
if severity in counts:
counts[severity] += 1
return counts


def _scan_completed(tracer: Any) -> bool:
if tracer and tracer.scan_results:
return bool(tracer.scan_results.get("scan_completed", False))
return False


# ---------------------------------------------------------------------------
# Formatters
# ---------------------------------------------------------------------------


def _format_generic(tracer: Any, args: argparse.Namespace) -> dict[str, Any]:
"""Plain JSON payload with full scan data."""
completed = _scan_completed(tracer)
llm_stats = tracer.get_total_llm_stats()["total"] if tracer else {}
vuln_reports = tracer.vulnerability_reports if tracer else []
return {
"event": "scan_completed" if completed else "scan_ended",
"run_name": getattr(args, "run_name", ""),
"targets": _targets_summary(args),
"scan_mode": getattr(args, "scan_mode", ""),
"completed": completed,
"vulnerability_count": len(vuln_reports),
"severity_counts": _severity_counts(tracer),
"vulnerabilities": _vulnerability_summary(tracer),
"stats": {
"agents": len(tracer.agents) if tracer else 0,
"tools": tracer.get_real_tool_count() if tracer else 0,
"input_tokens": llm_stats.get("input_tokens", 0),
"output_tokens": llm_stats.get("output_tokens", 0),
"cost": llm_stats.get("cost", 0),
},
}
Comment on lines +155 to +176
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The formatters need to handle the case where tracer is None more consistently. Currently, _format_generic checks if tracer is truthy for llm_stats (line 132) but then accesses tracer.vulnerability_reports (line 139), tracer.agents (line 143), and tracer.get_real_tool_count() (line 144) without checking. If tracer is None, these will raise AttributeError. Either add None checks for all tracer accesses, or ensure the function is never called with None.

Copilot uses AI. Check for mistakes.


def _format_slack(tracer: Any, args: argparse.Namespace) -> dict[str, Any]:
"""Slack Block Kit payload."""
completed = _scan_completed(tracer)
vuln_reports = tracer.vulnerability_reports if tracer else []
vuln_count = len(vuln_reports)
counts = _severity_counts(tracer)

status_emoji = ":white_check_mark:" if completed else ":warning:"
status_text = "completed" if completed else "ended"

severity_line = (
" | ".join(f"*{sev.upper()}*: {cnt}" for sev, cnt in counts.items() if cnt > 0)
or "None found"
)

blocks: list[dict[str, Any]] = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{status_emoji} Strix Scan {status_text.title()}",
"emoji": True,
},
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Target:*\n{_targets_summary(args)}"},
{"type": "mrkdwn", "text": f"*Run:*\n{getattr(args, 'run_name', 'N/A')}"},
{"type": "mrkdwn", "text": f"*Scan Mode:*\n{getattr(args, 'scan_mode', 'N/A')}"},
{"type": "mrkdwn", "text": f"*Vulnerabilities:*\n{vuln_count}"},
],
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Severity Breakdown:* {severity_line}",
},
},
]

# Add top vulnerabilities (max 5)
for report in vuln_reports[:5]:
title = _truncate(report.get("title", "Untitled"), 200)
severity = report.get("severity", "unknown").upper()
endpoint = report.get("endpoint", "")
text = f":rotating_light: *[{severity}]* {title}"
if endpoint:
text += f"\n`{_truncate(endpoint, 200)}`"
text = _truncate(text, _SLACK_SECTION_TEXT_LIMIT)
blocks.append(
{
"type": "section",
"text": {"type": "mrkdwn", "text": text},
}
)

return {"blocks": blocks}
Comment on lines +179 to +237
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to _format_generic, this function accesses tracer.vulnerability_reports and tracer.agents without checking if tracer is None. Add None checks to prevent AttributeError when tracer is None.

Copilot uses AI. Check for mistakes.


def _format_discord(tracer: Any, args: argparse.Namespace) -> dict[str, Any]:
"""Discord webhook payload with an embed."""
completed = _scan_completed(tracer)
vuln_reports = tracer.vulnerability_reports if tracer else []
vuln_count = len(vuln_reports)
counts = _severity_counts(tracer)

color = 0x22C55E if completed else 0xEAB308 # green / yellow
if counts["critical"] > 0:
color = 0xDC2626
elif counts["high"] > 0:
color = 0xEA580C

severity_line = (
" | ".join(f"**{sev.upper()}**: {cnt}" for sev, cnt in counts.items() if cnt > 0)
or "None found"
)

fields: list[dict[str, Any]] = [
{
"name": "Target",
"value": _truncate(_targets_summary(args), _DISCORD_FIELD_VALUE_LIMIT),
"inline": True,
},
{"name": "Scan Mode", "value": getattr(args, "scan_mode", "N/A"), "inline": True},
{"name": "Vulnerabilities", "value": str(vuln_count), "inline": True},
{"name": "Severity Breakdown", "value": severity_line, "inline": False},
]

# Top vulnerabilities (max 5)
for report in vuln_reports[:5]:
title = _truncate(report.get("title", "Untitled"), 200)
severity = report.get("severity", "unknown").upper()
endpoint = report.get("endpoint", "")
value = f"**[{severity}]** {title}"
if endpoint:
value += f"\n`{_truncate(endpoint, 200)}`"
value = _truncate(value, _DISCORD_FIELD_VALUE_LIMIT)
fields.append({"name": "\u200b", "value": value, "inline": False})

status_text = "Scan Completed" if completed else "Scan Ended"
run_name = _truncate(getattr(args, "run_name", "N/A"), _DISCORD_TITLE_LIMIT)

embed: dict[str, Any] = {
"title": _truncate(f"\ud83d\udd12 Strix \u2014 {status_text}", _DISCORD_TITLE_LIMIT),
"description": f"Run: **{run_name}**",
"color": color,
"fields": fields,
"footer": {"text": "Strix Security Scanner"},
}

return {"embeds": [embed]}
Comment on lines +240 to +291
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to other formatters, this function accesses tracer.vulnerability_reports without checking if tracer is None. Add None checks to prevent AttributeError when tracer is None.

Copilot uses AI. Check for mistakes.
Loading