diff --git a/scripts/check_live_bounty_closing_refs.py b/scripts/check_live_bounty_closing_refs.py index 227ded65..1a92988c 100644 --- a/scripts/check_live_bounty_closing_refs.py +++ b/scripts/check_live_bounty_closing_refs.py @@ -1,252 +1,253 @@ -from __future__ import annotations - -import argparse -import json -import subprocess -import sys -import urllib.error -import urllib.request -from pathlib import Path -from typing import Any - -if __package__ in {None, ""}: - sys.path.insert(0, str(Path(__file__).resolve().parents[1])) - -from scripts.api_host_args import public_api_host -from scripts.bounty_refs import GITHUB_CLOSING_ISSUE_RE - -DEFAULT_API_HOST = "https://api.mrwk.online" -GH_TIMEOUT_SECONDS = 30 -GH_PR_SAFETY_CAP = 200 -MAX_BOUNTY_REF = 2**63 - 1 - - -def _int_or_none(value: Any) -> int | None: - try: - return int(value) - except (TypeError, ValueError): - return None - - -def _status_value(raw: dict[str, Any]) -> str: - return str(raw.get("status") or raw.get("state") or "").lower() - - -def _issue_number(raw: dict[str, Any]) -> int | None: - return _int_or_none(raw.get("issue_number", raw.get("number"))) - - -def _open_public_bounty_numbers(data: dict[str, Any]) -> set[int]: - numbers: set[int] = set() - for item in data.get("bounties", []): - if not isinstance(item, dict) or _status_value(item) != "open": - continue - issue_number = _issue_number(item) - if issue_number is not None: - numbers.add(issue_number) - return numbers - - -def _closing_refs(text: str) -> list[tuple[int, str]]: - refs: list[tuple[int, str]] = [] - for match in GITHUB_CLOSING_ISSUE_RE.finditer(text or ""): - issue_number = _int_or_none(match.group("issue")) - if issue_number is None or issue_number > MAX_BOUNTY_REF: - continue - refs.append((issue_number, f"{match.group('verb')} #{issue_number}")) - return refs - - -def analyze_closing_refs(data: dict[str, Any]) -> dict[str, Any]: - open_bounties = _open_public_bounty_numbers(data) - violations: list[dict[str, Any]] = [] - pull_requests = [item for item in data.get("pull_requests", []) if isinstance(item, dict)] - for pr in pull_requests: - number = _int_or_none(pr.get("number")) - if number is None: - continue - text = "\n".join(str(pr.get(key) or "") for key in ("title", "body")) - for issue_number, matched_reference in _closing_refs(text): - if issue_number not in open_bounties: - continue - violations.append( - { - "pull_request": number, - "title": str(pr.get("title") or ""), - "url": pr.get("url"), - "issue_number": issue_number, - "matched_reference": matched_reference, - "detail": ( - f"PR #{number} uses closing reference {matched_reference!r} " - f"against open public bounty #{issue_number}" - ), - } - ) - return { - "summary": { - "pull_requests": len(pull_requests), - "open_public_bounties": len(open_bounties), - "closing_references_to_open_bounties": len(violations), - }, - "violations": violations, - } - - -def has_violations(report: dict[str, Any]) -> bool: - return bool(report["violations"]) - - -def _single_line(value: Any) -> str: - return " ".join(str(value or "").split()) - - -def format_text_report(report: dict[str, Any]) -> str: - lines = ["Live bounty closing-reference check"] - for key, value in report["summary"].items(): - lines.append(f"- {key.replace('_', ' ')}: {value}") - if not has_violations(report): - lines.append("") - lines.append("No closing references to open public bounties found.") - return "\n".join(lines) - lines.append("") - lines.append("Closing references to open public bounties") - for item in report["violations"]: - lines.append( - "- PR #{pull_request}: {title} ({matched_reference} -> bounty #{issue_number})".format( - pull_request=item["pull_request"], - title=_single_line(item["title"]), - matched_reference=item["matched_reference"], - issue_number=item["issue_number"], - ) - ) - return "\n".join(lines) - - -def _run_gh_json(args: list[str]) -> Any: - command = " ".join(args) - try: - completed = subprocess.run( - args, - check=True, - capture_output=True, - text=True, - encoding="utf-8", - errors="replace", - timeout=GH_TIMEOUT_SECONDS, - ) - except subprocess.TimeoutExpired as exc: - raise RuntimeError(f"gh command timed out after {GH_TIMEOUT_SECONDS}s: {command}") from exc - except subprocess.CalledProcessError as exc: - raise RuntimeError( - "gh command failed " - f"(exit {exc.returncode}): {command}\n" - f"stdout:\n{exc.stdout or exc.output or ''}\n" - f"stderr:\n{exc.stderr or ''}" - ) from exc - return json.loads(completed.stdout) - - -def _fetch_json(url: str) -> Any: - request = urllib.request.Request(url, headers={"Accept": "application/json"}) - try: - with urllib.request.urlopen(request, timeout=GH_TIMEOUT_SECONDS) as response: - return json.loads(response.read().decode("utf-8")) - except (TimeoutError, urllib.error.URLError, json.JSONDecodeError) as exc: - raise RuntimeError(f"failed to fetch JSON from {url}: {exc}") from exc - - -def _load_public_bounties(api_host: str) -> list[dict[str, Any]]: - url = f"{api_host.rstrip('/')}/api/v1/bounties?status=open&limit=200" - data = _fetch_json(url) - if not isinstance(data, list): - raise RuntimeError(f"expected a JSON list from {url}") - return [item for item in data if isinstance(item, dict)] - - -def _load_pull_requests(repo: str, state: str, pr_numbers: list[int]) -> list[dict[str, Any]]: - if pr_numbers: - return [ - _run_gh_json( - [ - "gh", - "pr", - "view", - str(number), - "--repo", - repo, - "--json", - "number,title,url,body,state", - ] - ) - for number in pr_numbers - ] - prs = _run_gh_json( - [ - "gh", - "pr", - "list", - "--repo", - repo, - "--state", - state, - "--limit", - str(GH_PR_SAFETY_CAP), - "--json", - "number,title,url,body,state", - ] - ) - if len(prs) >= GH_PR_SAFETY_CAP: - raise RuntimeError( - f"gh pr list reached the {GH_PR_SAFETY_CAP} item safety cap; " - "use --pr for a bounded check or an API-paginated collector" - ) - return [item for item in prs if isinstance(item, dict)] - - -def load_live_data(repo: str, api_host: str, state: str, pr_numbers: list[int]) -> dict[str, Any]: - return { - "bounties": _load_public_bounties(api_host), - "pull_requests": _load_pull_requests(repo, state, pr_numbers), - } - - -def _load_input(path: str) -> dict[str, Any]: - with open(path, encoding="utf-8") as handle: - data = json.load(handle) - if not isinstance(data, dict): - raise ValueError("input must be a JSON object") - return data - - -def main(argv: list[str] | None = None) -> int: - parser = argparse.ArgumentParser( - description=( - "Fail when GitHub closing keywords target currently open public MergeWork " - "bounty issues." - ) - ) - source = parser.add_mutually_exclusive_group(required=True) - source.add_argument("--input", help="Read bounties and pull_requests from a JSON fixture.") - source.add_argument("--repo", help="GitHub repository, for example ramimbo/mergework.") - parser.add_argument("--api-host", type=public_api_host, default=DEFAULT_API_HOST) - parser.add_argument("--state", choices=["open", "closed", "merged", "all"], default="open") - parser.add_argument("--pr", type=int, action="append", default=[], help="Specific PR to check.") - parser.add_argument("--format", choices=["json", "text"], default="text") - parser.add_argument("--fail-on-issues", action="store_true") - args = parser.parse_args(argv) - - data = ( - _load_input(args.input) - if args.input - else load_live_data(args.repo, args.api_host, args.state, args.pr) - ) - report = analyze_closing_refs(data) - if args.format == "json": - print(json.dumps(report, indent=2, sort_keys=True)) - else: - print(format_text_report(report)) - return 1 if args.fail_on_issues and has_violations(report) else 0 - - -if __name__ == "__main__": - raise SystemExit(main(sys.argv[1:])) +from __future__ import annotations + +import argparse +import json +import subprocess +import sys +import urllib.error +import urllib.request +from pathlib import Path +from typing import Any + +if __package__ in {None, ""}: + sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from scripts.api_host_args import public_api_host +from scripts.bounty_refs import GITHUB_CLOSING_ISSUE_RE + +GH_TIMEOUT_SECONDS = 30 + +DEFAULT_API_HOST = "https://api.mrwk.online" +GH_PR_SAFETY_CAP = 200 +MAX_BOUNTY_REF = 2**63 - 1 + + +def _int_or_none(value: Any) -> int | None: + try: + return int(value) + except (TypeError, ValueError): + return None + + +def _status_value(raw: dict[str, Any]) -> str: + return str(raw.get("status") or raw.get("state") or "").lower() + + +def _issue_number(raw: dict[str, Any]) -> int | None: + return _int_or_none(raw.get("issue_number", raw.get("number"))) + + +def _open_public_bounty_numbers(data: dict[str, Any]) -> set[int]: + numbers: set[int] = set() + for item in data.get("bounties", []): + if not isinstance(item, dict) or _status_value(item) != "open": + continue + issue_number = _issue_number(item) + if issue_number is not None: + numbers.add(issue_number) + return numbers + + +def _closing_refs(text: str) -> list[tuple[int, str]]: + refs: list[tuple[int, str]] = [] + for match in GITHUB_CLOSING_ISSUE_RE.finditer(text or ""): + issue_number = _int_or_none(match.group("issue")) + if issue_number is None or issue_number > MAX_BOUNTY_REF: + continue + refs.append((issue_number, f"{match.group('verb')} #{issue_number}")) + return refs + + +def analyze_closing_refs(data: dict[str, Any]) -> dict[str, Any]: + open_bounties = _open_public_bounty_numbers(data) + violations: list[dict[str, Any]] = [] + pull_requests = [item for item in data.get("pull_requests", []) if isinstance(item, dict)] + for pr in pull_requests: + number = _int_or_none(pr.get("number")) + if number is None: + continue + text = "\n".join(str(pr.get(key) or "") for key in ("title", "body")) + for issue_number, matched_reference in _closing_refs(text): + if issue_number not in open_bounties: + continue + violations.append( + { + "pull_request": number, + "title": str(pr.get("title") or ""), + "url": pr.get("url"), + "issue_number": issue_number, + "matched_reference": matched_reference, + "detail": ( + f"PR #{number} uses closing reference {matched_reference!r} " + f"against open public bounty #{issue_number}" + ), + } + ) + return { + "summary": { + "pull_requests": len(pull_requests), + "open_public_bounties": len(open_bounties), + "closing_references_to_open_bounties": len(violations), + }, + "violations": violations, + } + + +def has_violations(report: dict[str, Any]) -> bool: + return bool(report["violations"]) + + +def _single_line(value: Any) -> str: + return " ".join(str(value or "").split()) + + +def format_text_report(report: dict[str, Any]) -> str: + lines = ["Live bounty closing-reference check"] + for key, value in report["summary"].items(): + lines.append(f"- {key.replace('_', ' ')}: {value}") + if not has_violations(report): + lines.append("") + lines.append("No closing references to open public bounties found.") + return "\n".join(lines) + lines.append("") + lines.append("Closing references to open public bounties") + for item in report["violations"]: + lines.append( + "- PR #{pull_request}: {title} ({matched_reference} -> bounty #{issue_number})".format( + pull_request=item["pull_request"], + title=_single_line(item["title"]), + matched_reference=item["matched_reference"], + issue_number=item["issue_number"], + ) + ) + return "\n".join(lines) + + +def _run_gh_json(args: list[str]) -> Any: + command = " ".join(args) + try: + completed = subprocess.run( + args, + check=True, + capture_output=True, + text=True, + encoding="utf-8", + errors="replace", + timeout=GH_TIMEOUT_SECONDS, + ) + except subprocess.TimeoutExpired as exc: + raise RuntimeError(f"gh command timed out after {GH_TIMEOUT_SECONDS}s: {command}") from exc + except subprocess.CalledProcessError as exc: + raise RuntimeError( + "gh command failed " + f"(exit {exc.returncode}): {command}\n" + f"stdout:\n{exc.stdout or exc.output or ''}\n" + f"stderr:\n{exc.stderr or ''}" + ) from exc + return json.loads(completed.stdout) + + +def _fetch_json(url: str) -> Any: + request = urllib.request.Request(url, headers={"Accept": "application/json"}) + try: + with urllib.request.urlopen(request, timeout=GH_TIMEOUT_SECONDS) as response: + return json.loads(response.read().decode("utf-8")) + except (TimeoutError, urllib.error.URLError, json.JSONDecodeError) as exc: + raise RuntimeError(f"failed to fetch JSON from {url}: {exc}") from exc + + +def _load_public_bounties(api_host: str) -> list[dict[str, Any]]: + url = f"{api_host.rstrip('/')}/api/v1/bounties?status=open&limit=200" + data = _fetch_json(url) + if not isinstance(data, list): + raise RuntimeError(f"expected a JSON list from {url}") + return [item for item in data if isinstance(item, dict)] + + +def _load_pull_requests(repo: str, state: str, pr_numbers: list[int]) -> list[dict[str, Any]]: + if pr_numbers: + return [ + _run_gh_json( + [ + "gh", + "pr", + "view", + str(number), + "--repo", + repo, + "--json", + "number,title,url,body,state", + ] + ) + for number in pr_numbers + ] + prs = _run_gh_json( + [ + "gh", + "pr", + "list", + "--repo", + repo, + "--state", + state, + "--limit", + str(GH_PR_SAFETY_CAP), + "--json", + "number,title,url,body,state", + ] + ) + if len(prs) >= GH_PR_SAFETY_CAP: + raise RuntimeError( + f"gh pr list reached the {GH_PR_SAFETY_CAP} item safety cap; " + "use --pr for a bounded check or an API-paginated collector" + ) + return [item for item in prs if isinstance(item, dict)] + + +def load_live_data(repo: str, api_host: str, state: str, pr_numbers: list[int]) -> dict[str, Any]: + return { + "bounties": _load_public_bounties(api_host), + "pull_requests": _load_pull_requests(repo, state, pr_numbers), + } + + +def _load_input(path: str) -> dict[str, Any]: + with open(path, encoding="utf-8") as handle: + data = json.load(handle) + if not isinstance(data, dict): + raise ValueError("input must be a JSON object") + return data + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + description=( + "Fail when GitHub closing keywords target currently open public MergeWork " + "bounty issues." + ) + ) + source = parser.add_mutually_exclusive_group(required=True) + source.add_argument("--input", help="Read bounties and pull_requests from a JSON fixture.") + source.add_argument("--repo", help="GitHub repository, for example ramimbo/mergework.") + parser.add_argument("--api-host", type=public_api_host, default=DEFAULT_API_HOST) + parser.add_argument("--state", choices=["open", "closed", "merged", "all"], default="open") + parser.add_argument("--pr", type=int, action="append", default=[], help="Specific PR to check.") + parser.add_argument("--format", choices=["json", "text"], default="text") + parser.add_argument("--fail-on-issues", action="store_true") + args = parser.parse_args(argv) + + data = ( + _load_input(args.input) + if args.input + else load_live_data(args.repo, args.api_host, args.state, args.pr) + ) + report = analyze_closing_refs(data) + if args.format == "json": + print(json.dumps(report, indent=2, sort_keys=True)) + else: + print(format_text_report(report)) + return 1 if args.fail_on_issues and has_violations(report) else 0 + + +if __name__ == "__main__": + raise SystemExit(main(sys.argv[1:])) diff --git a/scripts/pr_queue_health.py b/scripts/pr_queue_health.py index cf39a90e..92e81cdb 100644 --- a/scripts/pr_queue_health.py +++ b/scripts/pr_queue_health.py @@ -1,462 +1,475 @@ -from __future__ import annotations - -import argparse -import json -import re -import subprocess -import sys -from collections import defaultdict -from pathlib import Path -from typing import Any - -if __package__ in {None, ""}: - sys.path.insert(0, str(Path(__file__).resolve().parents[1])) - -from scripts.bounty_refs import BOUNTY_REF_RE - -NOISY_TITLE_PREFIX_RE = re.compile(r"^\s*(?:\[[^\]]+\]\s*)+") -UNSTABLE_MERGE_STATES = {"blocked", "conflicting", "dirty", "unknown", "unstable"} -GH_TIMEOUT_SECONDS = 30 -GH_PR_SAFETY_CAP = 201 -GH_ISSUE_SAFETY_CAP = 201 -MAX_BOUNTY_REF = 2**63 - 1 -ISSUE_SECTIONS = ( - ("Closed or exhausted bounty references", "closed_bounty_references"), - ("Non-live bounty references", "non_live_bounty_references"), - ("Missing bounty references", "missing_bounty_references"), - ("Dirty or unstable merge state", "dirty_or_unstable_merge_state"), - ("Needs info", "needs_info"), -) - - -def _labels(raw: dict[str, Any]) -> list[str]: - labels = raw.get("labels", []) - names: list[str] = [] - for label in labels: - if isinstance(label, str): - names.append(label) - elif isinstance(label, dict) and isinstance(label.get("name"), str): - names.append(label["name"]) - return names - - -def _comments(raw: dict[str, Any]) -> list[str]: - comments = raw.get("comments", []) - bodies: list[str] = [] - for comment in comments: - if isinstance(comment, str): - bodies.append(comment) - elif isinstance(comment, dict) and isinstance(comment.get("body"), str): - bodies.append(comment["body"]) - return bodies - - -def _merge_state(raw: dict[str, Any]) -> str: - for key in ("merge_state", "mergeStateStatus", "mergeable", "mergeable_state"): - value = raw.get(key) - if isinstance(value, str) and value: - return value.lower() - return "unknown" - - -def _scope_key(raw: dict[str, Any]) -> str: - explicit = raw.get("scope") - if isinstance(explicit, str) and explicit.strip(): - return " ".join(explicit.lower().split()) - title = str(raw.get("title") or "") - title = NOISY_TITLE_PREFIX_RE.sub("", title) - return " ".join(title.lower().split()) - - -def _bounty_refs(raw: dict[str, Any]) -> list[int]: - explicit = raw.get("bounty_refs") - if isinstance(explicit, list): - refs = [item for item in explicit if isinstance(item, int)] - if refs: - return sorted(set(refs)) - text = "\n".join( - str(raw.get(key) or "") - for key in ("title", "body", "description") - if raw.get(key) is not None - ) - found_refs: set[int] = set() - for match in BOUNTY_REF_RE.findall(text): - try: - ref = int(match) - except ValueError: - continue - if ref <= MAX_BOUNTY_REF: - found_refs.add(ref) - return sorted(found_refs) - - -def _is_open_bounty(raw: dict[str, Any]) -> bool: - state = str(raw.get("state") or "").lower() - remaining = raw.get("awards_remaining", raw.get("awardsRemaining")) - if state and state != "open": - return False - if remaining is not None: - try: - return int(remaining) > 0 - except (TypeError, ValueError): - return False - return state == "open" - - -def _bounty_liveness(raw: dict[str, Any]) -> tuple[bool, str]: - if not _is_open_bounty(raw): - return False, "closed or exhausted" - if "labels" in raw and not any(label.lower() == "mrwk:bounty" for label in _labels(raw)): - return False, "missing mrwk:bounty label" - if "comments" in raw and not any("Reserved on MergeWork:" in body for body in _comments(raw)): - return False, "missing Reserved on MergeWork claims-open comment" - return True, "live" - - -def _issue(pr: dict[str, Any], reason: str, detail: str) -> dict[str, Any]: - return { - "pull_request": pr["number"], - "title": pr["title"], - "url": pr.get("url"), - "reason": reason, - "detail": detail, - } - - -def analyze_queue(data: dict[str, Any]) -> dict[str, Any]: - bounties = { - int(item["number"]): item - for item in data.get("bounties", []) - if isinstance(item, dict) and isinstance(item.get("number"), int) - } - prs = [item for item in data.get("pull_requests", []) if isinstance(item, dict)] - normalized_prs: list[dict[str, Any]] = [] - for pr in prs: - if not isinstance(pr.get("number"), int): - continue - normalized_prs.append( - { - "number": int(pr["number"]), - "title": str(pr.get("title") or ""), - "url": pr.get("url"), - "refs": _bounty_refs(pr), - "labels": _labels(pr), - "merge_state": _merge_state(pr), - "scope": _scope_key(pr), - } - ) - - closed_bounty_references: list[dict[str, Any]] = [] - non_live_bounty_references: list[dict[str, Any]] = [] - missing_bounty_references: list[dict[str, Any]] = [] - dirty_or_unstable_merge_state: list[dict[str, Any]] = [] - needs_info: list[dict[str, Any]] = [] - duplicate_groups: dict[tuple[int, str], list[int]] = defaultdict(list) - - for pr in normalized_prs: - if not pr["refs"]: - missing_bounty_references.append( - _issue( - pr, - "missing_bounty_reference", - "No bounty reference such as Bounty #, Refs #, " - "Fixes #, or /claim # found", - ) - ) - for ref in pr["refs"]: - bounty = bounties.get(ref) - if bounty is None: - closed_bounty_references.append( - _issue( - pr, - "unknown_bounty_reference", - f"Referenced bounty #{ref} was not in input", - ) - ) - elif not _is_open_bounty(bounty): - closed_bounty_references.append( - _issue( - pr, - "closed_or_exhausted_bounty", - f"Referenced bounty #{ref} is not payable", - ) - ) - else: - is_live, reason = _bounty_liveness(bounty) - if not is_live: - non_live_bounty_references.append( - _issue( - pr, - "non_live_bounty_reference", - f"Referenced bounty #{ref} is not live claimable: {reason}", - ) - ) - duplicate_groups[(ref, pr["scope"])].append(pr["number"]) - if pr["merge_state"] in UNSTABLE_MERGE_STATES: - dirty_or_unstable_merge_state.append( - _issue(pr, "dirty_or_unstable_merge_state", f"Merge state is {pr['merge_state']}") - ) - if any(label.lower() == "mrwk:needs-info" for label in pr["labels"]): - needs_info.append(_issue(pr, "mrwk_needs_info", "PR has mrwk:needs-info label")) - - duplicate_scope_groups = [ - {"bounty": bounty, "scope": scope, "pull_requests": sorted(numbers)} - for (bounty, scope), numbers in sorted(duplicate_groups.items()) - if len(numbers) > 1 and scope - ] - closed_or_exhausted_count = sum( - 1 for bounty in bounties.values() if not _is_open_bounty(bounty) - ) - live_bounty_count = sum(1 for bounty in bounties.values() if _bounty_liveness(bounty)[0]) - non_live_bounty_count = sum( - 1 - for bounty in bounties.values() - if _is_open_bounty(bounty) and not _bounty_liveness(bounty)[0] - ) - report = { - "summary": { - "pull_requests": len(normalized_prs), - "open_bounties": len(bounties) - closed_or_exhausted_count, - "live_bounties": live_bounty_count, - "non_live_bounties": non_live_bounty_count, - "closed_or_exhausted_bounties": closed_or_exhausted_count, - "closed_bounty_references": len(closed_bounty_references), - "non_live_bounty_references": len(non_live_bounty_references), - "missing_bounty_references": len(missing_bounty_references), - "dirty_or_unstable_merge_state": len(dirty_or_unstable_merge_state), - "needs_info": len(needs_info), - "duplicate_scope_groups": len(duplicate_scope_groups), - }, - "closed_bounty_references": closed_bounty_references, - "non_live_bounty_references": non_live_bounty_references, - "missing_bounty_references": missing_bounty_references, - "dirty_or_unstable_merge_state": dirty_or_unstable_merge_state, - "needs_info": needs_info, - "duplicate_scope_groups": duplicate_scope_groups, - } - return report - - -def has_queue_issues(report: dict[str, Any]) -> bool: - return any( - report[key] - for key in ( - "closed_bounty_references", - "non_live_bounty_references", - "missing_bounty_references", - "dirty_or_unstable_merge_state", - "needs_info", - "duplicate_scope_groups", - ) - ) - - -def format_text_report(report: dict[str, Any]) -> str: - lines = ["PR queue health summary"] - for key, value in report["summary"].items(): - lines.append(f"- {key.replace('_', ' ')}: {value}") - if not has_queue_issues(report): - lines.append("") - lines.append("No queue-health issues found.") - return "\n".join(lines) - for title, key in ISSUE_SECTIONS: - if report[key]: - lines.append("") - lines.append(title) - for item in report[key]: - lines.append(f"- PR #{item['pull_request']}: {item['title']} ({item['detail']})") - if report["duplicate_scope_groups"]: - lines.append("") - lines.append("Likely duplicate bounty scope") - for item in report["duplicate_scope_groups"]: - prs = ", ".join(f"#{number}" for number in item["pull_requests"]) - lines.append(f"- Bounty #{item['bounty']}: {item['scope']} ({prs})") - return "\n".join(lines) - - -def _single_line(value: Any) -> str: - return " ".join(str(value or "").split()) - - -def _markdown_pr_issue(item: dict[str, Any]) -> str: - pr_label = f"PR #{item['pull_request']}" - url = item.get("url") - if isinstance(url, str) and url: - pr_label = f"[{pr_label}]({url})" - return f"- {pr_label}: {_single_line(item['title'])} ({_single_line(item['detail'])})" - - -def format_markdown_report(report: dict[str, Any]) -> str: - lines = ["## PR Queue Health Summary", ""] - for key, value in report["summary"].items(): - lines.append(f"- **{key.replace('_', ' ')}**: {value}") - if not has_queue_issues(report): - lines.append("") - lines.append("No queue-health issues found.") - return "\n".join(lines) - - for title, key in ISSUE_SECTIONS: - if report[key]: - lines.append("") - lines.append(f"### {title}") - for item in report[key]: - lines.append(_markdown_pr_issue(item)) - if report["duplicate_scope_groups"]: - lines.append("") - lines.append("### Likely duplicate bounty scope") - for item in report["duplicate_scope_groups"]: - prs = ", ".join(f"#{number}" for number in item["pull_requests"]) - lines.append(f"- Bounty #{item['bounty']}: {_single_line(item['scope'])} ({prs})") - return "\n".join(lines) - - -def _run_gh_json(args: list[str]) -> Any: - command = " ".join(args) - try: - completed = subprocess.run( - args, - check=True, - capture_output=True, - text=True, - encoding="utf-8", - errors="replace", - timeout=GH_TIMEOUT_SECONDS, - ) - except subprocess.TimeoutExpired as exc: - raise RuntimeError(f"gh command timed out after {GH_TIMEOUT_SECONDS}s: {command}") from exc - except subprocess.CalledProcessError as exc: - raise RuntimeError( - "gh command failed " - f"(exit {exc.returncode}): {command}\n" - f"stdout:\n{exc.stdout or exc.output or ''}\n" - f"stderr:\n{exc.stderr or ''}" - ) from exc - return json.loads(completed.stdout) - - -def load_live_queue(repo: str) -> dict[str, Any]: - prs = _run_gh_json( - [ - "gh", - "pr", - "list", - "--repo", - repo, - "--state", - "open", - "--limit", - str(GH_PR_SAFETY_CAP), - "--json", - "number,title,url,body,labels,mergeStateStatus", - ] - ) - if len(prs) >= GH_PR_SAFETY_CAP: - raise RuntimeError( - f"gh pr list reached the {GH_PR_SAFETY_CAP} item safety cap; " - "use an API-paginated collector before trusting this live report" - ) - referenced_issues = sorted( - {ref for pr in prs if isinstance(pr, dict) for ref in _bounty_refs(pr)} - ) - referenced_issue_numbers = set(referenced_issues) - issues = _run_gh_json( - [ - "gh", - "issue", - "list", - "--repo", - repo, - "--state", - "all", - "--limit", - str(GH_ISSUE_SAFETY_CAP), - "--json", - "number,title,state,labels", - ] - ) - if len(issues) >= GH_ISSUE_SAFETY_CAP: - raise RuntimeError( - f"gh issue list reached the {GH_ISSUE_SAFETY_CAP} item safety cap; " - "use an API-paginated collector before trusting this live report" - ) - issues_by_number = { - int(issue["number"]): issue - for issue in issues - if isinstance(issue, dict) and isinstance(issue.get("number"), int) - } - bounty_numbers = { - int(issue["number"]) - for issue in issues - if isinstance(issue, dict) - and isinstance(issue.get("number"), int) - and "bounty" in str(issue.get("title", "")).lower() - } | referenced_issue_numbers - bounty_issues = [] - for number in sorted(bounty_numbers): - issue = issues_by_number.get(number, {"number": number}) - viewed_issue = issue - include_comments = number in referenced_issue_numbers - if include_comments: - try: - viewed_issue = _run_gh_json( - [ - "gh", - "issue", - "view", - str(number), - "--repo", - repo, - "--comments", - "--json", - "number,title,state,labels,comments", - ] - ) - except RuntimeError: - if number not in issues_by_number: - continue - bounty_issue = { - "number": int(viewed_issue["number"]), - "title": viewed_issue.get("title"), - "state": viewed_issue.get("state"), - "labels": viewed_issue.get("labels", []), - "awards_remaining": 1 if viewed_issue.get("state") == "OPEN" else 0, - } - if include_comments: - bounty_issue["comments"] = viewed_issue.get("comments", []) - bounty_issues.append(bounty_issue) - return {"pull_requests": prs, "bounties": bounty_issues} - - -def _load_input(path: str) -> dict[str, Any]: - with open(path, encoding="utf-8") as handle: - data = json.load(handle) - if not isinstance(data, dict): - raise ValueError("queue input must be a JSON object") - return data - - -def main(argv: list[str] | None = None) -> int: - parser = argparse.ArgumentParser(description="Summarize MergeWork open PR queue health.") - source = parser.add_mutually_exclusive_group(required=True) - source.add_argument("--input", help="Read queue data from a JSON fixture file.") - source.add_argument( - "--repo", - help="Collect live queue data with gh, for example ramimbo/mergework.", - ) - parser.add_argument("--format", choices=["json", "markdown", "text"], default="text") - parser.add_argument("--fail-on-issues", action="store_true") - args = parser.parse_args(argv) - - data = _load_input(args.input) if args.input else load_live_queue(args.repo) - report = analyze_queue(data) - if args.format == "json": - print(json.dumps(report, indent=2, sort_keys=True)) - elif args.format == "markdown": - print(format_markdown_report(report)) - else: - print(format_text_report(report)) - return 1 if args.fail_on_issues and has_queue_issues(report) else 0 - - -if __name__ == "__main__": - raise SystemExit(main(sys.argv[1:])) +from __future__ import annotations + +import argparse +import json +import re +import subprocess +import sys +from collections import defaultdict +from pathlib import Path +from typing import Any + +if __package__ in {None, ""}: + sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from scripts.bounty_refs import BOUNTY_REF_RE + +GH_TIMEOUT_SECONDS = 30 + +NOISY_TITLE_PREFIX_RE = re.compile(r"^\s*(?:\[[^\]]+\]\s*)+") +UNSTABLE_MERGE_STATES = {"blocked", "conflicting", "dirty", "unknown", "unstable"} +GH_PR_SAFETY_CAP = 201 +GH_ISSUE_SAFETY_CAP = 201 +MAX_BOUNTY_REF = 2**63 - 1 +ISSUE_SECTIONS = ( + ("Closed or exhausted bounty references", "closed_bounty_references"), + ("Non-live bounty references", "non_live_bounty_references"), + ("Missing bounty references", "missing_bounty_references"), + ("Dirty or unstable merge state", "dirty_or_unstable_merge_state"), + ("Needs info", "needs_info"), +) + + +def _labels(raw: dict[str, Any]) -> list[str]: + labels = raw.get("labels", []) + names: list[str] = [] + for label in labels: + if isinstance(label, str): + names.append(label) + elif isinstance(label, dict) and isinstance(label.get("name"), str): + names.append(label["name"]) + return names + + +def _comments(raw: dict[str, Any]) -> list[str]: + comments = raw.get("comments", []) + bodies: list[str] = [] + for comment in comments: + if isinstance(comment, str): + bodies.append(comment) + elif isinstance(comment, dict) and isinstance(comment.get("body"), str): + bodies.append(comment["body"]) + return bodies + + +def _merge_state(raw: dict[str, Any]) -> str: + for key in ("merge_state", "mergeStateStatus", "mergeable", "mergeable_state"): + value = raw.get(key) + if isinstance(value, str) and value: + return value.lower() + return "unknown" + + +def _scope_key(raw: dict[str, Any]) -> str: + explicit = raw.get("scope") + if isinstance(explicit, str) and explicit.strip(): + return " ".join(explicit.lower().split()) + title = str(raw.get("title") or "") + title = NOISY_TITLE_PREFIX_RE.sub("", title) + return " ".join(title.lower().split()) + + +def _bounty_refs(raw: dict[str, Any]) -> list[int]: + explicit = raw.get("bounty_refs") + if isinstance(explicit, list): + refs = [item for item in explicit if isinstance(item, int)] + if refs: + return sorted(set(refs)) + text = "\n".join( + str(raw.get(key) or "") + for key in ("title", "body", "description") + if raw.get(key) is not None + ) + found_refs: set[int] = set() + for match in BOUNTY_REF_RE.findall(text): + try: + ref = int(match) + except ValueError: + continue + if ref <= MAX_BOUNTY_REF: + found_refs.add(ref) + return sorted(found_refs) + + +def _is_open_bounty(raw: dict[str, Any]) -> bool: + state = str(raw.get("state") or "").lower() + remaining = raw.get("awards_remaining", raw.get("awardsRemaining")) + if state and state != "open": + return False + if remaining is not None: + try: + return int(remaining) > 0 + except (TypeError, ValueError): + return False + return state == "open" + + +def _bounty_liveness(raw: dict[str, Any]) -> tuple[bool, str]: + if not _is_open_bounty(raw): + return False, "closed or exhausted" + if "labels" in raw and not any(label.lower() == "mrwk:bounty" for label in _labels(raw)): + return False, "missing mrwk:bounty label" + if "comments" in raw and not any("Reserved on MergeWork:" in body for body in _comments(raw)): + return False, "missing Reserved on MergeWork claims-open comment" + return True, "live" + + +def _issue(pr: dict[str, Any], reason: str, detail: str) -> dict[str, Any]: + return { + "pull_request": pr["number"], + "title": pr["title"], + "url": pr.get("url"), + "reason": reason, + "detail": detail, + } + + +def analyze_queue(data: dict[str, Any]) -> dict[str, Any]: + bounties = { + int(item["number"]): item + for item in data.get("bounties", []) + if isinstance(item, dict) and isinstance(item.get("number"), int) + } + prs = [item for item in data.get("pull_requests", []) if isinstance(item, dict)] + normalized_prs: list[dict[str, Any]] = [] + for pr in prs: + if not isinstance(pr.get("number"), int): + continue + normalized_prs.append( + { + "number": int(pr["number"]), + "title": str(pr.get("title") or ""), + "url": pr.get("url"), + "refs": _bounty_refs(pr), + "labels": _labels(pr), + "merge_state": _merge_state(pr), + "scope": _scope_key(pr), + } + ) + + closed_bounty_references: list[dict[str, Any]] = [] + non_live_bounty_references: list[dict[str, Any]] = [] + missing_bounty_references: list[dict[str, Any]] = [] + dirty_or_unstable_merge_state: list[dict[str, Any]] = [] + needs_info: list[dict[str, Any]] = [] + duplicate_groups: dict[tuple[int, str], list[int]] = defaultdict(list) + + for pr in normalized_prs: + if not pr["refs"]: + missing_bounty_references.append( + _issue( + pr, + "missing_bounty_reference", + "No bounty reference such as Bounty #, Refs #, " + "Fixes #, or /claim # found", + ) + ) + for ref in pr["refs"]: + bounty = bounties.get(ref) + if bounty is None: + closed_bounty_references.append( + _issue( + pr, + "unknown_bounty_reference", + f"Referenced bounty #{ref} was not in input", + ) + ) + elif not _is_open_bounty(bounty): + closed_bounty_references.append( + _issue( + pr, + "closed_or_exhausted_bounty", + f"Referenced bounty #{ref} is not payable", + ) + ) + else: + is_live, reason = _bounty_liveness(bounty) + if not is_live: + non_live_bounty_references.append( + _issue( + pr, + "non_live_bounty_reference", + f"Referenced bounty #{ref} is not live claimable: {reason}", + ) + ) + duplicate_groups[(ref, pr["scope"])].append(pr["number"]) + if pr["merge_state"] in UNSTABLE_MERGE_STATES: + dirty_or_unstable_merge_state.append( + _issue(pr, "dirty_or_unstable_merge_state", f"Merge state is {pr['merge_state']}") + ) + if any(label.lower() == "mrwk:needs-info" for label in pr["labels"]): + needs_info.append(_issue(pr, "mrwk_needs_info", "PR has mrwk:needs-info label")) + + duplicate_scope_groups = [ + {"bounty": bounty, "scope": scope, "pull_requests": sorted(numbers)} + for (bounty, scope), numbers in sorted(duplicate_groups.items()) + if len(numbers) > 1 and scope + ] + closed_or_exhausted_count = sum( + 1 for bounty in bounties.values() if not _is_open_bounty(bounty) + ) + live_bounty_count = sum(1 for bounty in bounties.values() if _bounty_liveness(bounty)[0]) + non_live_bounty_count = sum( + 1 + for bounty in bounties.values() + if _is_open_bounty(bounty) and not _bounty_liveness(bounty)[0] + ) + report = { + "summary": { + "pull_requests": len(normalized_prs), + "open_bounties": len(bounties) - closed_or_exhausted_count, + "live_bounties": live_bounty_count, + "non_live_bounties": non_live_bounty_count, + "closed_or_exhausted_bounties": closed_or_exhausted_count, + "closed_bounty_references": len(closed_bounty_references), + "non_live_bounty_references": len(non_live_bounty_references), + "missing_bounty_references": len(missing_bounty_references), + "dirty_or_unstable_merge_state": len(dirty_or_unstable_merge_state), + "needs_info": len(needs_info), + "duplicate_scope_groups": len(duplicate_scope_groups), + }, + "closed_bounty_references": closed_bounty_references, + "non_live_bounty_references": non_live_bounty_references, + "missing_bounty_references": missing_bounty_references, + "dirty_or_unstable_merge_state": dirty_or_unstable_merge_state, + "needs_info": needs_info, + "duplicate_scope_groups": duplicate_scope_groups, + } + return report + + +def has_queue_issues(report: dict[str, Any]) -> bool: + return any( + report[key] + for key in ( + "closed_bounty_references", + "non_live_bounty_references", + "missing_bounty_references", + "dirty_or_unstable_merge_state", + "needs_info", + "duplicate_scope_groups", + ) + ) + + +def format_text_report(report: dict[str, Any]) -> str: + lines = ["PR queue health summary"] + for key, value in report["summary"].items(): + lines.append(f"- {key.replace('_', ' ')}: {value}") + if not has_queue_issues(report): + lines.append("") + lines.append("No queue-health issues found.") + return "\n".join(lines) + for title, key in ISSUE_SECTIONS: + if report[key]: + lines.append("") + lines.append(title) + for item in report[key]: + lines.append(f"- PR #{item['pull_request']}: {item['title']} ({item['detail']})") + if report["duplicate_scope_groups"]: + lines.append("") + lines.append("Likely duplicate bounty scope") + for item in report["duplicate_scope_groups"]: + prs = ", ".join(f"#{number}" for number in item["pull_requests"]) + lines.append(f"- Bounty #{item['bounty']}: {item['scope']} ({prs})") + return "\n".join(lines) + + +def _single_line(value: Any) -> str: + return " ".join(str(value or "").split()) + + +def _markdown_pr_issue(item: dict[str, Any]) -> str: + pr_label = f"PR #{item['pull_request']}" + url = item.get("url") + if isinstance(url, str) and url: + pr_label = f"[{pr_label}]({url})" + return f"- {pr_label}: {_single_line(item['title'])} ({_single_line(item['detail'])})" + + +def format_markdown_report(report: dict[str, Any]) -> str: + lines = ["## PR Queue Health Summary", ""] + for key, value in report["summary"].items(): + lines.append(f"- **{key.replace('_', ' ')}**: {value}") + if not has_queue_issues(report): + lines.append("") + lines.append("No queue-health issues found.") + return "\n".join(lines) + + for title, key in ISSUE_SECTIONS: + if report[key]: + lines.append("") + lines.append(f"### {title}") + for item in report[key]: + lines.append(_markdown_pr_issue(item)) + if report["duplicate_scope_groups"]: + lines.append("") + lines.append("### Likely duplicate bounty scope") + for item in report["duplicate_scope_groups"]: + prs = ", ".join(f"#{number}" for number in item["pull_requests"]) + lines.append(f"- Bounty #{item['bounty']}: {_single_line(item['scope'])} ({prs})") + return "\n".join(lines) + + +def _run_gh_json(args: list[str]) -> Any: + command = " ".join(args) + try: + completed = subprocess.run( + args, + check=True, + capture_output=True, + text=True, + encoding="utf-8", + errors="replace", + timeout=GH_TIMEOUT_SECONDS, + ) + except subprocess.TimeoutExpired as exc: + raise RuntimeError(f"gh command timed out after {GH_TIMEOUT_SECONDS}s: {command}") from exc + except subprocess.CalledProcessError as exc: + raise RuntimeError( + "gh command failed " + f"(exit {exc.returncode}): {command}\n" + f"stdout:\n{exc.stdout or exc.output or ''}\n" + f"stderr:\n{exc.stderr or ''}" + ) from exc + return json.loads(completed.stdout) + + +def load_live_queue(repo: str) -> dict[str, Any]: + prs = _run_gh_json( + [ + "gh", + "pr", + "list", + "--repo", + repo, + "--state", + "open", + "--limit", + str(GH_PR_SAFETY_CAP), + "--json", + "number,title,url,body,labels,mergeStateStatus", + ] + ) + if len(prs) >= GH_PR_SAFETY_CAP: + raise RuntimeError( + f"gh pr list reached the {GH_PR_SAFETY_CAP} item safety cap; " + "use an API-paginated collector before trusting this live report" + ) + referenced_issues = sorted( + {ref for pr in prs if isinstance(pr, dict) for ref in _bounty_refs(pr)} + ) + referenced_issue_numbers = set(referenced_issues) + issues = _run_gh_json( + [ + "gh", + "issue", + "list", + "--repo", + repo, + "--state", + "all", + "--limit", + str(GH_ISSUE_SAFETY_CAP), + "--json", + "number,title,state,labels", + ] + ) + if len(issues) >= GH_ISSUE_SAFETY_CAP: + raise RuntimeError( + f"gh issue list reached the {GH_ISSUE_SAFETY_CAP} item safety cap; " + "use an API-paginated collector before trusting this live report" + ) + issues_by_number = { + int(issue["number"]): issue + for issue in issues + if isinstance(issue, dict) and isinstance(issue.get("number"), int) + } + bounty_numbers = { + int(issue["number"]) + for issue in issues + if isinstance(issue, dict) + and isinstance(issue.get("number"), int) + and "bounty" in str(issue.get("title", "")).lower() + } | referenced_issue_numbers + bounty_issues = [] + for number in sorted(bounty_numbers): + issue = issues_by_number.get(number, {"number": number}) + viewed_issue = issue + include_comments = number in referenced_issue_numbers + if include_comments: + try: + viewed_issue = _run_gh_json( + [ + "gh", + "issue", + "view", + str(number), + "--repo", + repo, + "--comments", + "--json", + "number,title,state,labels,comments", + ] + ) + except RuntimeError: + if number not in issues_by_number: + continue + bounty_issue = { + "number": int(viewed_issue["number"]), + "title": viewed_issue.get("title"), + "state": viewed_issue.get("state"), + "labels": viewed_issue.get("labels", []), + "awards_remaining": 1 if viewed_issue.get("state") == "OPEN" else 0, + } + if include_comments: + bounty_issue["comments"] = viewed_issue.get("comments", []) + bounty_issues.append(bounty_issue) + return {"pull_requests": prs, "bounties": bounty_issues} + + +def _load_input(path: str) -> dict[str, Any]: + with open(path, encoding="utf-8") as handle: + data = json.load(handle) + if not isinstance(data, dict): + raise ValueError("queue input must be a JSON object") + return data + + +def _require_non_empty_arg(parser: argparse.ArgumentParser, option_name: str, value: str) -> str: + stripped = value.strip() + if not stripped: + parser.error(f"{option_name} must be a non-empty value") + if stripped != value: + parser.error(f"{option_name} must not include leading or trailing whitespace") + return value + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description="Summarize MergeWork open PR queue health.") + source = parser.add_mutually_exclusive_group(required=True) + source.add_argument("--input", help="Read queue data from a JSON fixture file.") + source.add_argument( + "--repo", + help="Collect live queue data with gh, for example ramimbo/mergework.", + ) + parser.add_argument("--format", choices=["json", "markdown", "text"], default="text") + parser.add_argument("--fail-on-issues", action="store_true") + args = parser.parse_args(argv) + + if args.input is not None: + data = _load_input(_require_non_empty_arg(parser, "--input", args.input)) + else: + data = load_live_queue(_require_non_empty_arg(parser, "--repo", args.repo)) + report = analyze_queue(data) + if args.format == "json": + print(json.dumps(report, indent=2, sort_keys=True)) + elif args.format == "markdown": + print(format_markdown_report(report)) + else: + print(format_text_report(report)) + return 1 if args.fail_on_issues and has_queue_issues(report) else 0 + + +if __name__ == "__main__": + raise SystemExit(main(sys.argv[1:])) diff --git a/scripts/review_bounty_candidates.py b/scripts/review_bounty_candidates.py index 46c833b5..5797caf8 100644 --- a/scripts/review_bounty_candidates.py +++ b/scripts/review_bounty_candidates.py @@ -1,384 +1,651 @@ -from __future__ import annotations - -import argparse -import json -import subprocess -import sys -from collections import Counter -from typing import Any - -DIRTY_MERGE_STATES = {"blocked", "conflicting", "dirty"} -GH_TIMEOUT_SECONDS = 30 -GH_PR_SAFETY_CAP = 201 -STANDARD_QUALITY_CHECK = "Quality, readiness, docs, and image checks" -HUMAN_REVIEW_STATES = {"APPROVED", "CHANGES_REQUESTED", "COMMENTED"} - - -def _login(raw: Any) -> str: - if isinstance(raw, str): - return raw.strip().lower() - if isinstance(raw, dict): - login = raw.get("login") - if isinstance(login, str): - return login.strip().lower() - return "" - - -def _display_login(raw: Any) -> str: - login = _login(raw) - return login or "unknown" - - -def _labels(raw: dict[str, Any]) -> list[str]: - labels = raw.get("labels", []) - names: list[str] = [] - if not isinstance(labels, list): - return names - for label in labels: - if isinstance(label, str): - names.append(label) - elif isinstance(label, dict) and isinstance(label.get("name"), str): - names.append(label["name"]) - return names - - -def _merge_state(raw: dict[str, Any]) -> str: - for key in ("merge_state", "mergeStateStatus", "mergeable", "mergeable_state"): - value = raw.get(key) - if isinstance(value, str) and value: - return value.lower() - return "unknown" - - -def _head_oid(raw: dict[str, Any]) -> str: - for key in ("headRefOid", "head_ref_oid", "head_sha", "head"): - value = raw.get(key) - if isinstance(value, str) and value: - return value - return "" - - -def _check_name(check: dict[str, Any]) -> str: - return str(check.get("name") or check.get("context") or check.get("workflowName") or "") - - -def _check_state(check: dict[str, Any]) -> str: - return str(check.get("conclusion") or check.get("state") or check.get("status") or "").upper() - - -def _standard_quality_state(raw: dict[str, Any]) -> str: - checks = raw.get("statusCheckRollup", raw.get("status_checks", [])) - if not isinstance(checks, list): - return "missing" - for check in checks: - if isinstance(check, dict) and _check_name(check) == STANDARD_QUALITY_CHECK: - state = _check_state(check) - if state in {"SUCCESS", "PASS"}: - return "success" - if state: - return state.lower() - return "pending" - return "missing" - - -def _review_commit(review: dict[str, Any]) -> str: - commit = review.get("commit") - if isinstance(commit, dict): - for key in ("oid", "sha"): - value = commit.get(key) - if isinstance(value, str) and value: - return value - for key in ("commit_id", "commitId", "commit_oid"): - value = review.get(key) - if isinstance(value, str) and value: - return value - return "" - - -def _is_bot_author(raw: Any) -> bool: - if isinstance(raw, dict): - if raw.get("is_bot") is True: - return True - login = _login(raw) - else: - login = _login(raw) - return login.endswith("[bot]") or login in {"coderabbitai", "github-actions"} - - -def _human_reviews(raw: dict[str, Any], pr_author: str) -> list[dict[str, Any]]: - reviews = raw.get("reviews", []) - if not isinstance(reviews, list): - return [] - useful: list[dict[str, Any]] = [] - for review in reviews: - if not isinstance(review, dict): - continue - author = review.get("author") - login = _login(author) - state = str(review.get("state") or "").upper() - if not login or login == pr_author or state not in HUMAN_REVIEW_STATES: - continue - if _is_bot_author(author): - continue - useful.append(review) - return useful - - -def _review_summary(review: dict[str, Any] | None) -> dict[str, str | None]: - if review is None: - return {"reviewer": None, "state": None, "commit": None} - return { - "reviewer": _display_login(review.get("author")), - "state": str(review.get("state") or "").upper() or None, - "commit": _review_commit(review) or None, - } - - -def _latest_review(reviews: list[dict[str, Any]]) -> dict[str, Any] | None: - if not reviews: - return None - return reviews[-1] - - -def _classify_pr( - raw: dict[str, Any], - *, - reviewer: str, - sufficient_reviews: int, -) -> dict[str, Any]: - number = int(raw["number"]) - title = str(raw.get("title") or "") - pr_author = _login(raw.get("author")) - labels = _labels(raw) - normalized_labels = {label.lower() for label in labels} - merge_state = _merge_state(raw) - head_oid = _head_oid(raw) - quality_state = _standard_quality_state(raw) - reviews = _human_reviews(raw, pr_author) - current_reviews = [review for review in reviews if _review_commit(review) == head_oid] - current_reviewer_reviews = [ - review for review in current_reviews if _login(review.get("author")) == reviewer - ] - reviewer_reviews = [review for review in reviews if _login(review.get("author")) == reviewer] - latest_human_review = _latest_review(reviews) - latest_reviewer_review = _latest_review(reviewer_reviews) - changes_requested = any( - str(review.get("state") or "").upper() == "CHANGES_REQUESTED" for review in current_reviews - ) - - state = "candidate_for_fresh_review" - reason = "no current-head human review found" - if pr_author == reviewer: - state = "self_authored" - reason = "reviewer authored this PR" - elif "mrwk:needs-info" in normalized_labels: - state = "needs_info" - reason = "PR has mrwk:needs-info label" - elif merge_state in DIRTY_MERGE_STATES: - state = "dirty_or_conflicted" - reason = f"merge state is {merge_state}" - elif quality_state != "success": - state = "missing_standard_quality_check" - reason = f"standard quality check is {quality_state}" - elif current_reviewer_reviews: - state = "already_reviewed_current_head_by_reviewer" - reason = "reviewer already reviewed current head" - elif changes_requested: - state = "waiting_for_author_update" - reason = "current-head human review already requested changes" - elif len(current_reviews) >= sufficient_reviews: - state = "already_has_sufficient_current_head_human_reviews" - reason = f"{len(current_reviews)} current-head human review(s) already present" - elif latest_reviewer_review is not None: - reason = "reviewer last reviewed an older head" - elif latest_human_review is not None: - reason = "latest useful human review is stale" - - return { - "pull_request": number, - "title": title, - "url": raw.get("url"), - "author": _display_login(raw.get("author")), - "state": state, - "reason": reason, - "headRefOid": head_oid or None, - "mergeStateStatus": merge_state, - "standard_quality_check": quality_state, - "labels": labels, - "current_head_human_reviews": len(current_reviews), - "latest_human_review": _review_summary(latest_human_review), - } - - -def analyze_candidates( - data: dict[str, Any], - *, - reviewer: str, - sufficient_reviews: int = 1, -) -> dict[str, Any]: - reviewer_login = reviewer.strip().lower() - if not reviewer_login: - raise ValueError("reviewer must not be empty") - if sufficient_reviews < 1: - raise ValueError("sufficient_reviews must be at least 1") - rows = [ - _classify_pr(raw, reviewer=reviewer_login, sufficient_reviews=sufficient_reviews) - for raw in data.get("pull_requests", []) - if isinstance(raw, dict) and isinstance(raw.get("number"), int) - ] - counts = Counter(row["state"] for row in rows) - return { - "reviewer": reviewer_login, - "summary": { - "pull_requests": len(rows), - **{key: counts.get(key, 0) for key in sorted(counts)}, - }, - "pull_requests": rows, - } - - -def _single_line(value: Any) -> str: - return " ".join(str(value or "").split()) - - -def format_text_report(report: dict[str, Any]) -> str: - lines = [f"Review bounty candidates for {report['reviewer']}"] - for key, value in report["summary"].items(): - lines.append(f"- {key.replace('_', ' ')}: {value}") - for row in report["pull_requests"]: - lines.append( - f"- PR #{row['pull_request']}: {row['state']} - " - f"{_single_line(row['title'])} ({_single_line(row['reason'])})" - ) - return "\n".join(lines) - - -def format_markdown_report(report: dict[str, Any]) -> str: - lines = [f"## Review Bounty Candidates For `{report['reviewer']}`", ""] - for key, value in report["summary"].items(): - lines.append(f"- **{key.replace('_', ' ')}**: {value}") - for row in report["pull_requests"]: - label = f"PR #{row['pull_request']}" - if row.get("url"): - label = f"[{label}]({row['url']})" - lines.append( - f"- {label}: `{row['state']}` - {_single_line(row['title'])} " - f"({_single_line(row['reason'])})" - ) - return "\n".join(lines) - - -def _run_gh_json(args: list[str]) -> Any: - command = " ".join(args) - try: - completed = subprocess.run( - args, - check=True, - capture_output=True, - text=True, - encoding="utf-8", - errors="replace", - timeout=GH_TIMEOUT_SECONDS, - ) - except subprocess.TimeoutExpired as exc: - raise RuntimeError(f"gh command timed out after {GH_TIMEOUT_SECONDS}s: {command}") from exc - except FileNotFoundError as exc: - raise RuntimeError( - "GitHub CLI executable 'gh' was not found; install gh and ensure it is on PATH " - "before using live --repo mode" - ) from exc - except subprocess.CalledProcessError as exc: - raise RuntimeError( - "gh command failed " - f"(exit {exc.returncode}): {command}\n" - f"stdout:\n{exc.stdout or exc.output or ''}\n" - f"stderr:\n{exc.stderr or ''}" - ) from exc - return json.loads(completed.stdout) - - -def load_live_candidates(repo: str) -> dict[str, Any]: - prs = _run_gh_json( - [ - "gh", - "pr", - "list", - "--repo", - repo, - "--state", - "open", - "--limit", - str(GH_PR_SAFETY_CAP), - "--json", - ",".join( - [ - "number", - "title", - "url", - "author", - "headRefOid", - "mergeStateStatus", - "labels", - "statusCheckRollup", - "reviews", - ] - ), - ] - ) - if len(prs) >= GH_PR_SAFETY_CAP: - raise RuntimeError( - f"gh pr list reached the {GH_PR_SAFETY_CAP} item safety cap; " - "use an API-paginated collector before trusting this live report" - ) - return {"pull_requests": prs} - - -def _load_input(path: str) -> dict[str, Any]: - with open(path, encoding="utf-8") as handle: - data = json.load(handle) - if not isinstance(data, dict): - raise ValueError("candidate input must be a JSON object") - return data - - -def _require_non_empty_arg(parser: argparse.ArgumentParser, option_name: str, value: str) -> str: - stripped = value.strip() - if not stripped: - parser.error(f"{option_name} must be a non-empty value") - if stripped != value: - parser.error(f"{option_name} must not include leading or trailing whitespace") - return value - - -def main(argv: list[str] | None = None) -> int: - parser = argparse.ArgumentParser( - description="Rank open PRs for reviewer-specific review-bounty work." - ) - source = parser.add_mutually_exclusive_group(required=True) - source.add_argument("--input", help="Read candidate data from a JSON fixture file.") - source.add_argument("--repo", help="Collect live open PR data with gh.") - parser.add_argument("--reviewer", required=True, help="GitHub login of the reviewer.") - parser.add_argument("--sufficient-reviews", type=int, default=1) - parser.add_argument("--format", choices=["json", "markdown", "text"], default="text") - args = parser.parse_args(argv) - - if args.input is not None: - data = _load_input(_require_non_empty_arg(parser, "--input", args.input)) - else: - data = load_live_candidates(_require_non_empty_arg(parser, "--repo", args.repo)) - report = analyze_candidates( - data, - reviewer=args.reviewer, - sufficient_reviews=args.sufficient_reviews, - ) - if args.format == "json": - print(json.dumps(report, indent=2, sort_keys=True)) - elif args.format == "markdown": - print(format_markdown_report(report)) - else: - print(format_text_report(report)) - return 0 - - -if __name__ == "__main__": - raise SystemExit(main(sys.argv[1:])) +from __future__ import annotations + +import argparse +import json +import re +import subprocess +import sys +from collections import Counter, defaultdict +from pathlib import Path +from typing import Any + +if __package__ in {None, ""}: + sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +GH_TIMEOUT_SECONDS = 30 + +DIRTY_MERGE_STATES = {"blocked", "conflicting", "dirty"} +GH_PR_SAFETY_CAP = 201 +STANDARD_QUALITY_CHECK = "Quality, readiness, docs, and image checks" +HUMAN_REVIEW_STATES = {"APPROVED", "CHANGES_REQUESTED", "COMMENTED"} +CLAIM_SIGNAL_RE = re.compile( + r"(^|\s)(/claim|claim(?:ing)?|reviewed|review bounty|evidence)(\b|\s|:)", + re.IGNORECASE, +) +PR_REVIEW_EVIDENCE_RE = re.compile( + r"https://github\.com/([^/]+)/([^/]+)/pull/(\d+)#pullrequestreview-\d+", + re.IGNORECASE, +) +PR_COMMENT_EVIDENCE_RE = re.compile( + r"https://github\.com/([^/]+)/([^/]+)/pull/(\d+)#issuecomment-\d+", + re.IGNORECASE, +) +PR_LINK_RE = re.compile( + r"https://github\.com/([^/]+)/([^/]+)/pull/(\d+)(?:\b|[^\d])", + re.IGNORECASE, +) +PR_NUMBER_REF_RE = re.compile(r"\bpull/(\d+)\b|\bPR\s+#(\d+)\b", re.IGNORECASE) +LABELED_HEAD_SHA_RE = re.compile( + r"(?:head(?:RefOid| ref| sha| oid)?|head)\s*[:=]\s*[`']?([0-9a-f]{40})[`']?", + re.IGNORECASE, +) +LABELED_BASE_SHA_RE = re.compile( + r"(?:base(?:RefOid| ref| sha| oid)?|origin/main|main sha|main)" + r"\s*[:=]\s*[`']?([0-9a-f]{40})[`']?", + re.IGNORECASE, +) +SATURATION_PROTECTED_STATES = {"self_authored", "needs_info"} + + +def _login(raw: Any) -> str: + if isinstance(raw, str): + return raw.strip().lower() + if isinstance(raw, dict): + login = raw.get("login") + if isinstance(login, str): + return login.strip().lower() + return "" + + +def _display_login(raw: Any) -> str: + login = _login(raw) + return login or "unknown" + + +def _labels(raw: dict[str, Any]) -> list[str]: + labels = raw.get("labels", []) + names: list[str] = [] + if not isinstance(labels, list): + return names + for label in labels: + if isinstance(label, str): + names.append(label) + elif isinstance(label, dict) and isinstance(label.get("name"), str): + names.append(label["name"]) + return names + + +def _merge_state(raw: dict[str, Any]) -> str: + for key in ("merge_state", "mergeStateStatus", "mergeable", "mergeable_state"): + value = raw.get(key) + if isinstance(value, str) and value: + return value.lower() + return "unknown" + + +def _head_oid(raw: dict[str, Any]) -> str: + for key in ("headRefOid", "head_ref_oid", "head_sha", "head"): + value = raw.get(key) + if isinstance(value, str) and value: + return value + return "" + + +def _base_oid(raw: dict[str, Any]) -> str: + for key in ("baseRefOid", "base_ref_oid", "base_sha", "base"): + value = raw.get(key) + if isinstance(value, str) and value: + return value + return "" + + +def _comment_url(comment: dict[str, Any]) -> str: + for key in ("html_url", "url"): + value = comment.get(key) + if isinstance(value, str) and value: + return value + return "" + + +def _repo_matches(owner: str, name: str, repo: str) -> bool: + try: + expected_owner, expected_name = repo.split("/", 1) + except ValueError: + return False + return owner.lower() == expected_owner.lower() and name.lower() == expected_name.lower() + + +def _parse_claim_comment(comment: dict[str, Any], *, repo: str) -> list[dict[str, Any]]: + body = str(comment.get("body") or "") + if not body.strip(): + return [] + if not ( + CLAIM_SIGNAL_RE.search(body) + or PR_REVIEW_EVIDENCE_RE.search(body) + or PR_COMMENT_EVIDENCE_RE.search(body) + ): + return [] + + pr_evidence: dict[int, tuple[str, str]] = {} + for match in PR_REVIEW_EVIDENCE_RE.finditer(body): + if _repo_matches(match.group(1), match.group(2), repo): + pr_evidence[int(match.group(3))] = (match.group(0), "pr_review") + for match in PR_COMMENT_EVIDENCE_RE.finditer(body): + if _repo_matches(match.group(1), match.group(2), repo): + pr = int(match.group(3)) + pr_evidence.setdefault(pr, (match.group(0), "pr_comment")) + for match in PR_LINK_RE.finditer(body): + if _repo_matches(match.group(1), match.group(2), repo): + pr = int(match.group(3)) + pr_evidence.setdefault(pr, (match.group(0).split("#", 1)[0], "pr_reference")) + for match in PR_NUMBER_REF_RE.finditer(body): + pr_str = match.group(1) or match.group(2) + if pr_str: + pr = int(pr_str) + pr_evidence.setdefault( + pr, + (f"https://github.com/{repo}/pull/{pr}", "pr_reference"), + ) + + if not pr_evidence: + return [] + + head_sha = None + base_sha = None + labeled_head = LABELED_HEAD_SHA_RE.search(body) + if labeled_head: + head_sha = labeled_head.group(1).lower() + labeled_base = LABELED_BASE_SHA_RE.search(body) + if labeled_base: + base_sha = labeled_base.group(1).lower() + + claim_url = _comment_url(comment) + claimant = _display_login(comment.get("author") or comment.get("user")) + submitted_at = comment.get("created_at") or comment.get("createdAt") + records: list[dict[str, Any]] = [] + for pr, (evidence_url, evidence_kind) in sorted(pr_evidence.items()): + records.append( + { + "pull_request": pr, + "claim_url": claim_url, + "evidence_url": evidence_url, + "evidence_kind": evidence_kind, + "head_sha": head_sha, + "base_sha": base_sha, + "claimant": claimant, + "submitted_at": submitted_at, + } + ) + return records + + +def index_bounty_claims(comments: list[Any], *, repo: str) -> dict[int, list[dict[str, Any]]]: + by_pr: dict[int, list[dict[str, Any]]] = defaultdict(list) + for comment in comments: + if not isinstance(comment, dict): + continue + for record in _parse_claim_comment(comment, repo=repo): + by_pr[int(record["pull_request"])].append(record) + return dict(by_pr) + + +def _check_name(check: dict[str, Any]) -> str: + return str(check.get("name") or check.get("context") or check.get("workflowName") or "") + + +def _check_state(check: dict[str, Any]) -> str: + return str(check.get("conclusion") or check.get("state") or check.get("status") or "").upper() + + +def _standard_quality_state(raw: dict[str, Any]) -> str: + checks = raw.get("statusCheckRollup", raw.get("status_checks", [])) + if not isinstance(checks, list): + return "missing" + for check in checks: + if isinstance(check, dict) and _check_name(check) == STANDARD_QUALITY_CHECK: + state = _check_state(check) + if state in {"SUCCESS", "PASS"}: + return "success" + if state: + return state.lower() + return "pending" + return "missing" + + +def _review_commit(review: dict[str, Any]) -> str: + commit = review.get("commit") + if isinstance(commit, dict): + for key in ("oid", "sha"): + value = commit.get(key) + if isinstance(value, str) and value: + return value + for key in ("commit_id", "commitId", "commit_oid"): + value = review.get(key) + if isinstance(value, str) and value: + return value + return "" + + +def _is_bot_author(raw: Any) -> bool: + if isinstance(raw, dict): + if raw.get("is_bot") is True: + return True + login = _login(raw) + else: + login = _login(raw) + return login.endswith("[bot]") or login in {"coderabbitai", "github-actions"} + + +def _human_reviews(raw: dict[str, Any], pr_author: str) -> list[dict[str, Any]]: + reviews = raw.get("reviews", []) + if not isinstance(reviews, list): + return [] + useful: list[dict[str, Any]] = [] + for review in reviews: + if not isinstance(review, dict): + continue + author = review.get("author") + login = _login(author) + state = str(review.get("state") or "").upper() + if not login or login == pr_author or state not in HUMAN_REVIEW_STATES: + continue + if _is_bot_author(author): + continue + useful.append(review) + return useful + + +def _review_summary(review: dict[str, Any] | None) -> dict[str, str | None]: + if review is None: + return {"reviewer": None, "state": None, "commit": None} + return { + "reviewer": _display_login(review.get("author")), + "state": str(review.get("state") or "").upper() or None, + "commit": _review_commit(review) or None, + } + + +def _latest_review(reviews: list[dict[str, Any]]) -> dict[str, Any] | None: + if not reviews: + return None + return reviews[-1] + + +def _classify_pr( + raw: dict[str, Any], + *, + reviewer: str, + sufficient_reviews: int, +) -> dict[str, Any]: + number = int(raw["number"]) + title = str(raw.get("title") or "") + pr_author = _login(raw.get("author")) + labels = _labels(raw) + normalized_labels = {label.lower() for label in labels} + merge_state = _merge_state(raw) + head_oid = _head_oid(raw) + quality_state = _standard_quality_state(raw) + reviews = _human_reviews(raw, pr_author) + current_reviews = [review for review in reviews if _review_commit(review) == head_oid] + current_reviewer_reviews = [ + review for review in current_reviews if _login(review.get("author")) == reviewer + ] + reviewer_reviews = [review for review in reviews if _login(review.get("author")) == reviewer] + latest_human_review = _latest_review(reviews) + latest_reviewer_review = _latest_review(reviewer_reviews) + changes_requested = any( + str(review.get("state") or "").upper() == "CHANGES_REQUESTED" for review in current_reviews + ) + + state = "candidate_for_fresh_review" + reason = "no current-head human review found" + if pr_author == reviewer: + state = "self_authored" + reason = "reviewer authored this PR" + elif "mrwk:needs-info" in normalized_labels: + state = "needs_info" + reason = "PR has mrwk:needs-info label" + elif merge_state in DIRTY_MERGE_STATES: + state = "dirty_or_conflicted" + reason = f"merge state is {merge_state}" + elif quality_state != "success": + state = "missing_standard_quality_check" + reason = f"standard quality check is {quality_state}" + elif current_reviewer_reviews: + state = "already_reviewed_current_head_by_reviewer" + reason = "reviewer already reviewed current head" + elif changes_requested: + state = "waiting_for_author_update" + reason = "current-head human review already requested changes" + elif len(current_reviews) >= sufficient_reviews: + state = "already_has_sufficient_current_head_human_reviews" + reason = f"{len(current_reviews)} current-head human review(s) already present" + elif latest_reviewer_review is not None: + reason = "reviewer last reviewed an older head" + elif latest_human_review is not None: + reason = "latest useful human review is stale" + + return { + "pull_request": number, + "title": title, + "url": raw.get("url"), + "author": _display_login(raw.get("author")), + "state": state, + "reason": reason, + "headRefOid": head_oid or None, + "baseRefOid": _base_oid(raw) or None, + "mergeStateStatus": merge_state, + "standard_quality_check": quality_state, + "labels": labels, + "current_head_human_reviews": len(current_reviews), + "latest_human_review": _review_summary(latest_human_review), + } + + +def _attach_claim_metadata(row: dict[str, Any], claims: list[dict[str, Any]]) -> None: + if not claims: + return + row["bounty_claims"] = [ + {key: value for key, value in claim.items() if key != "pull_request"} for claim in claims + ] + row["matched_claim_urls"] = [ + str(claim["claim_url"]) for claim in claims if claim.get("claim_url") + ] + latest = claims[-1] + if latest.get("evidence_kind"): + row["claim_evidence_kind"] = latest["evidence_kind"] + if latest.get("evidence_url"): + row["matched_evidence_urls"] = [ + str(claim["evidence_url"]) for claim in claims if claim.get("evidence_url") + ] + + +def _apply_bounty_saturation( + row: dict[str, Any], + *, + claims: list[dict[str, Any]], + merge_state: str, + head_oid: str, + base_oid: str, +) -> dict[str, Any]: + if not claims: + if merge_state in DIRTY_MERGE_STATES and row["state"] == "dirty_or_conflicted": + row["state"] = "dirty_unclaimed_current_base_candidate" + row["reason"] = ( + "dirty/conflicted PR has no review-bounty claim; " + "current-base follow-up may be useful" + ) + return row + + _attach_claim_metadata(row, claims) + if row["state"] in SATURATION_PROTECTED_STATES: + return row + + latest = claims[-1] + claim_head = str(latest.get("head_sha") or "").lower() + claim_base = str(latest.get("base_sha") or "").lower() + head = head_oid.lower() + base = base_oid.lower() + evidence_kind = str(latest.get("evidence_kind") or "") + + stale = False + stale_reason = "review-bounty claim may be stale" + if claim_head and head and claim_head != head: + stale = True + stale_reason = f"claim head {claim_head[:7]} differs from current head {head[:7]}" + elif claim_base and base and claim_base != base: + stale = True + stale_reason = f"claim base {claim_base[:7]} differs from current base {base[:7]}" + elif merge_state in DIRTY_MERGE_STATES and claim_head and head and claim_head == head: + stale = True + stale_reason = "PR is dirty/conflicted after a clean-current-head bounty claim" + + if stale: + row["state"] = "claimed_stale_head_or_base" + row["reason"] = stale_reason + elif evidence_kind == "pr_comment": + row["state"] = "claimed_by_pr_comment" + row["reason"] = "review-bounty claim uses PR comment evidence" + elif claim_head and head and claim_head == head: + row["state"] = "already_claimed_current_head" + row["reason"] = "review-bounty claim matches current PR head" + else: + row["state"] = "already_claimed_on_bounty_issue" + row["reason"] = "PR already referenced on review bounty issue" + return row + + +def analyze_candidates( + data: dict[str, Any], + *, + reviewer: str, + sufficient_reviews: int = 1, + repo: str | None = None, +) -> dict[str, Any]: + reviewer_login = reviewer.strip().lower() + if not reviewer_login: + raise ValueError("reviewer must not be empty") + if sufficient_reviews < 1: + raise ValueError("sufficient_reviews must be at least 1") + + effective_repo = repo or (data.get("repo") if isinstance(data.get("repo"), str) else None) + claims_by_pr: dict[int, list[dict[str, Any]]] = {} + raw_comments = data.get("bounty_claim_comments") + saturation_enabled = isinstance(effective_repo, str) and "bounty_claim_comments" in data + if saturation_enabled and isinstance(raw_comments, list): + claims_by_pr = index_bounty_claims(raw_comments, repo=effective_repo) + + rows: list[dict[str, Any]] = [] + for raw in data.get("pull_requests", []): + if not isinstance(raw, dict) or not isinstance(raw.get("number"), int): + continue + row = _classify_pr(raw, reviewer=reviewer_login, sufficient_reviews=sufficient_reviews) + if saturation_enabled: + row = _apply_bounty_saturation( + row, + claims=claims_by_pr.get(int(raw["number"]), []), + merge_state=_merge_state(raw), + head_oid=_head_oid(raw), + base_oid=_base_oid(raw), + ) + rows.append(row) + + counts = Counter(row["state"] for row in rows) + report: dict[str, Any] = { + "reviewer": reviewer_login, + "summary": { + "pull_requests": len(rows), + **{key: counts.get(key, 0) for key in sorted(counts)}, + }, + "pull_requests": rows, + } + if effective_repo and saturation_enabled: + report["bounty_issue_claims_indexed"] = sum(len(items) for items in claims_by_pr.values()) + return report + + +def _single_line(value: Any) -> str: + return " ".join(str(value or "").split()) + + +def format_text_report(report: dict[str, Any]) -> str: + lines = [f"Review bounty candidates for {report['reviewer']}"] + for key, value in report["summary"].items(): + lines.append(f"- {key.replace('_', ' ')}: {value}") + for row in report["pull_requests"]: + claim_note = "" + if row.get("matched_claim_urls"): + claim_note = f" claims={','.join(row['matched_claim_urls'])}" + lines.append( + f"- PR #{row['pull_request']}: {row['state']} - " + f"{_single_line(row['title'])} ({_single_line(row['reason'])}){claim_note}" + ) + return "\n".join(lines) + + +def format_markdown_report(report: dict[str, Any]) -> str: + lines = [f"## Review Bounty Candidates For `{report['reviewer']}`", ""] + for key, value in report["summary"].items(): + lines.append(f"- **{key.replace('_', ' ')}**: {value}") + for row in report["pull_requests"]: + label = f"PR #{row['pull_request']}" + if row.get("url"): + label = f"[{label}]({row['url']})" + claim_note = "" + if row.get("matched_claim_urls"): + urls = ", ".join(f"`{url}`" for url in row["matched_claim_urls"]) + claim_note = f" Claims: {urls}." + lines.append( + f"- {label}: `{row['state']}` - {_single_line(row['title'])} " + f"({_single_line(row['reason'])}){claim_note}" + ) + return "\n".join(lines) + + +def _run_gh_json(args: list[str]) -> Any: + command = " ".join(args) + try: + completed = subprocess.run( + args, + check=True, + capture_output=True, + text=True, + encoding="utf-8", + errors="replace", + timeout=GH_TIMEOUT_SECONDS, + ) + except subprocess.TimeoutExpired as exc: + raise RuntimeError(f"gh command timed out after {GH_TIMEOUT_SECONDS}s: {command}") from exc + except FileNotFoundError as exc: + raise RuntimeError( + "GitHub CLI executable 'gh' was not found; install gh and ensure it is on PATH " + "before using live --repo mode" + ) from exc + except subprocess.CalledProcessError as exc: + raise RuntimeError( + "gh command failed " + f"(exit {exc.returncode}): {command}\n" + f"stdout:\n{exc.stdout or exc.output or ''}\n" + f"stderr:\n{exc.stderr or ''}" + ) from exc + return json.loads(completed.stdout) + + +def load_live_candidates(repo: str, *, bounty_issue: int | None = None) -> dict[str, Any]: + prs = _run_gh_json( + [ + "gh", + "pr", + "list", + "--repo", + repo, + "--state", + "open", + "--limit", + str(GH_PR_SAFETY_CAP), + "--json", + ",".join( + [ + "number", + "title", + "url", + "author", + "headRefOid", + "baseRefOid", + "mergeStateStatus", + "labels", + "statusCheckRollup", + "reviews", + ] + ), + ] + ) + if len(prs) >= GH_PR_SAFETY_CAP: + raise RuntimeError( + f"gh pr list reached the {GH_PR_SAFETY_CAP} item safety cap; " + "use an API-paginated collector before trusting this live report" + ) + data: dict[str, Any] = {"repo": repo, "pull_requests": prs} + if bounty_issue is not None: + issue = _run_gh_json( + [ + "gh", + "issue", + "view", + str(bounty_issue), + "--repo", + repo, + "--json", + "comments", + ] + ) + comments = issue.get("comments", []) if isinstance(issue, dict) else [] + if not isinstance(comments, list): + comments = [] + data["bounty_claim_comments"] = comments + return data + + +def _load_input(path: str) -> dict[str, Any]: + with open(path, encoding="utf-8") as handle: + data = json.load(handle) + if not isinstance(data, dict): + raise ValueError("candidate input must be a JSON object") + return data + + +def _require_non_empty_arg(parser: argparse.ArgumentParser, option_name: str, value: str) -> str: + stripped = value.strip() + if not stripped: + parser.error(f"{option_name} must be a non-empty value") + if stripped != value: + parser.error(f"{option_name} must not include leading or trailing whitespace") + return value + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + description="Rank open PRs for reviewer-specific review-bounty work." + ) + source = parser.add_mutually_exclusive_group(required=True) + source.add_argument("--input", help="Read candidate data from a JSON fixture file.") + source.add_argument("--repo", help="Collect live open PR data with gh.") + parser.add_argument("--reviewer", required=True, help="GitHub login of the reviewer.") + parser.add_argument( + "--bounty-issue", + type=int, + help="Active review-bounty issue number for claim saturation (live --repo mode only).", + ) + parser.add_argument("--sufficient-reviews", type=int, default=1) + parser.add_argument("--format", choices=["json", "markdown", "text"], default="text") + args = parser.parse_args(argv) + + if args.input is not None and args.bounty_issue is not None: + parser.error("--bounty-issue is only valid in live --repo mode") + + if args.input is not None: + data = _load_input(_require_non_empty_arg(parser, "--input", args.input)) + else: + if args.bounty_issue is not None and args.bounty_issue < 1: + parser.error("--bounty-issue must be a positive integer") + data = load_live_candidates( + _require_non_empty_arg(parser, "--repo", args.repo), + bounty_issue=args.bounty_issue, + ) + report = analyze_candidates( + data, + reviewer=args.reviewer, + sufficient_reviews=args.sufficient_reviews, + repo=data.get("repo") if isinstance(data.get("repo"), str) else None, + ) + if args.format == "json": + print(json.dumps(report, indent=2, sort_keys=True)) + elif args.format == "markdown": + print(format_markdown_report(report)) + else: + print(format_text_report(report)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main(sys.argv[1:])) diff --git a/scripts/submission_quality_gate.py b/scripts/submission_quality_gate.py index 7e0ae856..40420bcd 100644 --- a/scripts/submission_quality_gate.py +++ b/scripts/submission_quality_gate.py @@ -1,806 +1,807 @@ -from __future__ import annotations - -import argparse -import json -import re -import subprocess -import sys -from datetime import UTC, datetime, timedelta -from difflib import SequenceMatcher -from pathlib import Path -from typing import Any -from urllib.error import HTTPError, URLError -from urllib.request import urlopen - -if __package__ in {None, ""}: - sys.path.insert(0, str(Path(__file__).resolve().parents[1])) - -from scripts.api_host_args import public_api_host -from scripts.bounty_refs import BOUNTY_REF_RE, GITHUB_LINKED_ISSUE_RE, LEADING_BOUNTY_REF_RE - - -def _non_negative_int(value: str) -> int: - """Argparse type that rejects negative --max-maintainer-age-days values. - - A negative threshold makes any maintainer activity look stale (the delta - always exceeds a negative window), so the gate emits a misleading - "stale activity" warning instead of flagging the invalid input. - """ - try: - parsed = int(value) - except (TypeError, ValueError): - raise argparse.ArgumentTypeError(f"expected an integer, got {value!r}") from None - if parsed < 0: - raise argparse.ArgumentTypeError(f"must be >= 0, got {parsed}") - return parsed - - -EVIDENCE_RE = re.compile( - r"\b(pytest|ruff|mypy|validation|verified|test evidence|checks? passed)\b", - re.IGNORECASE, -) -SUMMARY_RE = re.compile(r"\b(summary|what changed|changes?)\b", re.IGNORECASE) -GH_TIMEOUT_SECONDS = 30 -DEFAULT_API_HOST = "https://api.mrwk.online" -DEFAULT_MAX_MAINTAINER_AGE_DAYS = 14 -GH_PR_SAFETY_CAP = 101 -GH_ISSUE_SAFETY_CAP = 201 -MAINTAINER_ASSOCIATIONS = {"OWNER", "MEMBER", "COLLABORATOR"} -MAX_BOUNTY_REF = 2**63 - 1 -EFFECTIVE_AVAILABILITY_FIELDS = ( - "effective_awards_remaining", - "effective_available_mrwk", - "availability_state", - "availability_note", - "pending_payout_awards", -) - - -def _check(name: str, status: str, message: str) -> dict[str, str]: - return {"name": name, "status": status, "message": message} - - -def _first_present(raw: dict[str, Any], *names: str) -> Any: - for name in names: - if name in raw: - return raw.get(name) - return None - - -def _int_or_none(value: Any) -> int | None: - try: - return int(value) - except (TypeError, ValueError): - return None - - -def _extract_issue_refs(text: str, pattern: re.Pattern[str]) -> list[int]: - refs: list[int] = [] - seen: set[int] = set() - for match in pattern.findall(text): - try: - ref = int(match) - except ValueError: - continue - if ref > MAX_BOUNTY_REF: - continue - if ref in seen: - continue - seen.add(ref) - refs.append(ref) - return refs - - -def _bounty_refs(text: str) -> list[int]: - return _extract_issue_refs(text, BOUNTY_REF_RE) - - -def _github_linked_issue_refs(text: str) -> list[int]: - return _extract_issue_refs(text, GITHUB_LINKED_ISSUE_RE) - - -def _bounty_has_open_state(raw: dict[str, Any]) -> bool: - return str(_first_present(raw, "state", "status") or "").lower() in {"", "open"} - - -def _effective_awards_remaining(raw: dict[str, Any]) -> int | None: - value = _first_present(raw, "effective_awards_remaining", "effectiveAwardsRemaining") - if value is None: - return None - return _int_or_none(value) - - -def _availability_note_suffix(raw: dict[str, Any]) -> str: - note = str(_first_present(raw, "availability_note", "availabilityNote") or "").strip() - return f" ({note})" if note else "" - - -def _bounty_is_payable(raw: dict[str, Any]) -> bool: - if not _bounty_has_open_state(raw): - return False - effective_remaining = _effective_awards_remaining(raw) - if effective_remaining is not None: - return effective_remaining > 0 - remaining = raw.get("awards_remaining", raw.get("awardsRemaining")) - if remaining is None: - return True - parsed_remaining = _int_or_none(remaining) - return parsed_remaining is not None and parsed_remaining > 0 - - -def _bounty_payability_fail_message(bounty_ref: int, raw: dict[str, Any]) -> str: - if _bounty_has_open_state(raw) and _effective_awards_remaining(raw) is not None: - return ( - f"referenced bounty #{bounty_ref} has no effective awards remaining" - f"{_availability_note_suffix(raw)}" - ) - return f"referenced bounty #{bounty_ref} is closed or exhausted{_availability_note_suffix(raw)}" - - -def _bounty_payability_pass_message(bounty_ref: int, raw: dict[str, Any]) -> str: - effective_remaining = _effective_awards_remaining(raw) - if effective_remaining is None: - return f"referenced bounty #{bounty_ref} is open{_availability_note_suffix(raw)}" - return ( - f"referenced bounty #{bounty_ref} is open with {effective_remaining} " - f"effective award(s) remaining{_availability_note_suffix(raw)}" - ) - - -def _bounty_availability_warning(bounty_ref: int, raw: dict[str, Any]) -> dict[str, str] | None: - if not _bounty_has_open_state(raw): - return None - availability_state = str( - _first_present(raw, "availability_state", "availabilityState") or "" - ).lower() - pending_payout_awards = _int_or_none( - _first_present(raw, "pending_payout_awards", "pendingPayoutAwards") - ) - has_pending_payouts = pending_payout_awards is not None and pending_payout_awards > 0 - has_partial_state = availability_state == "pending_payouts_partial" - if not (has_pending_payouts or has_partial_state): - return None - if _effective_awards_remaining(raw) == 0: - return None - return _check( - "bounty_availability", - "warn", - f"referenced bounty #{bounty_ref} has reduced effective capacity" - f"{_availability_note_suffix(raw)}", - ) - - -def _bounty_payability_verified(raw: dict[str, Any]) -> bool: - return raw.get("payability_verified", True) is not False - - -def _active_attempts_verified(raw: dict[str, Any]) -> bool: - return raw.get("active_attempts_verified", True) is not False - - -def _copy_effective_availability_fields(source: dict[str, Any], target: dict[str, Any]) -> None: - for field in EFFECTIVE_AVAILABILITY_FIELDS: - if field in source: - target[field] = source[field] - - -def _safe_attempts(raw: dict[str, Any]) -> list[dict[str, Any]]: - attempts = raw.get("active_attempts", []) - if not isinstance(attempts, list): - return [] - return [attempt for attempt in attempts if isinstance(attempt, dict)] - - -def _attempt_field(attempt: dict[str, Any], *names: str) -> Any: - for name in names: - value = attempt.get(name) - if value not in (None, ""): - return value - return None - - -def _format_attempt_summary(attempt: dict[str, Any]) -> str: - parts: list[str] = [] - submitter = _attempt_field(attempt, "submitter", "submitter_account", "account", "github_login") - if submitter: - parts.append(f"submitter={submitter}") - source_url = _attempt_field(attempt, "source_url", "public_source_url", "url") - if source_url: - parts.append(f"source={source_url}") - status = _attempt_field(attempt, "status") - if status: - parts.append(f"status={status}") - expires_at = _attempt_field(attempt, "expires_at", "expiresAt", "expiry_time") - if expires_at: - parts.append(f"expires={expires_at}") - return ", ".join(parts) or "active attempt" - - -def _parse_datetime(value: Any) -> datetime | None: - if not isinstance(value, str) or not value: - return None - try: - parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) - except ValueError: - return None - if parsed.tzinfo is None: - parsed = parsed.replace(tzinfo=UTC) - return parsed.astimezone(UTC) - - -def _isoformat_utc(value: datetime) -> str: - return value.astimezone(UTC).isoformat().replace("+00:00", "Z") - - -def _current_time(data: dict[str, Any]) -> datetime: - return _parse_datetime(data.get("now")) or datetime.now(UTC) - - -def _maintainer_activity_check( - bounty_ref: int, bounty: dict[str, Any], now: datetime -) -> dict[str, str] | None: - if "last_maintainer_activity_at" not in bounty and "maintainer_activity_verified" not in bounty: - return None - if bounty.get("maintainer_activity_verified") is False: - return _check( - "maintainer_activity", - "warn", - f"recent maintainer activity for bounty #{bounty_ref} could not be verified", - ) - last_activity = _parse_datetime(bounty.get("last_maintainer_activity_at")) - if last_activity is None: - return _check( - "maintainer_activity", - "warn", - f"recent maintainer activity for bounty #{bounty_ref} could not be verified", - ) - try: - max_age_days = int(bounty.get("max_maintainer_age_days", DEFAULT_MAX_MAINTAINER_AGE_DAYS)) - except (TypeError, ValueError): - return _check( - "maintainer_activity", - "warn", - f"recent maintainer activity for bounty #{bounty_ref} could not be verified", - ) - if max_age_days < 0: - return _check( - "maintainer_activity", - "warn", - f"invalid maintainer activity threshold for bounty #{bounty_ref}: " - f"max_maintainer_age_days must be >= 0, got {max_age_days}", - ) - delta = now - last_activity - age_days = max(0, int(delta.total_seconds() // 86400)) - if delta > timedelta(days=max_age_days): - return _check( - "maintainer_activity", - "warn", - f"last maintainer activity for bounty #{bounty_ref} was {age_days} days ago", - ) - return _check( - "maintainer_activity", - "pass", - f"maintainer activity for bounty #{bounty_ref} was seen {age_days} days ago", - ) - - -def _title_from_submission(text: str) -> str: - for line in text.splitlines(): - clean = line.strip(" -:\t") - if not clean: - continue - clean = LEADING_BOUNTY_REF_RE.sub("", clean).strip(" -:\t") - if not clean: - continue - if SUMMARY_RE.search(clean) and len(clean.split()) <= 4: - continue - if BOUNTY_REF_RE.search(clean) or EVIDENCE_RE.search(clean): - continue - return " ".join(clean.lower().split()) - return "" - - -def _similarity(left: str, right: str) -> float: - return SequenceMatcher(None, left.lower(), right.lower()).ratio() - - -def _has_evidence(text: str) -> bool: - for line in text.splitlines(): - clean = line.strip() - if not clean: - continue - if SUMMARY_RE.search(clean) and ":" in clean: - continue - if EVIDENCE_RE.search(clean): - return True - return False - - -def _matching_pr_bounty_refs(pr: dict[str, Any]) -> list[int]: - text = "\n".join(str(pr.get(key) or "") for key in ("title", "body")) - return _bounty_refs(text) - - -def _similar_open_prs( - pull_requests: list[dict[str, Any]], bounty_ref: int | None, submission_title: str -) -> list[dict[str, Any]]: - if bounty_ref is None or not submission_title: - return [] - matches: list[dict[str, Any]] = [] - for pr in pull_requests: - if str(pr.get("state") or "OPEN").lower() not in {"open", "opened"}: - continue - if bounty_ref not in _matching_pr_bounty_refs(pr): - continue - title = str(pr.get("title") or "") - if _similarity(submission_title, title) < 0.78: - continue - matches.append( - { - "number": pr.get("number"), - "title": title, - "url": pr.get("url"), - } - ) - return matches - - -def evaluate_submission(data: dict[str, Any]) -> dict[str, Any]: - text = str(data.get("submission_text") or "") - now = _current_time(data) - bounties = { - int(item["number"]): item - for item in data.get("bounties", []) - if isinstance(item, dict) and isinstance(item.get("number"), int) - } - pull_requests = [item for item in data.get("pull_requests", []) if isinstance(item, dict)] - checks: list[dict[str, str]] = [] - load_warning = str(data.get("load_warning") or "").strip() - if load_warning: - checks.append(_check("source_completeness", "warn", load_warning)) - refs = _bounty_refs(text) - bounty_ref = refs[0] if refs else None - if bounty_ref is None: - checks.append( - _check( - "bounty_reference", - "fail", - "submission text must include a bounty reference such as " - "Bounty #, Refs #, Fixes #, or /claim #", - ) - ) - else: - checks.append(_check("bounty_reference", "pass", f"found bounty reference #{bounty_ref}")) - if bounty_ref in _github_linked_issue_refs(text): - checks.append( - _check( - "github_linked_issue", - "pass", - f"GitHub-linking reference found for bounty #{bounty_ref}", - ) - ) - else: - checks.append( - _check( - "github_linked_issue", - "warn", - f"MergeWork bounty reference #{bounty_ref} is valid, but GitHub or bot " - "linked-issue checks may stay skipped without `Refs #" - f"{bounty_ref}`; use closing keywords only when the bounty should close", - ) - ) - if len(refs) > 1: - joined_refs = ", ".join(f"#{ref}" for ref in refs) - checks.append( - _check( - "single_bounty_reference", - "warn", - f"submission references multiple bounties ({joined_refs}); " - "keep one bounty target or split the work", - ) - ) - bounty = bounties.get(bounty_ref) - if bounty is None: - checks.append( - _check( - "bounty_payable", - "warn", - f"referenced bounty #{bounty_ref} was not available in input", - ) - ) - elif not _bounty_is_payable(bounty): - checks.append( - _check( - "bounty_payable", - "fail", - _bounty_payability_fail_message(bounty_ref, bounty), - ) - ) - elif not _bounty_payability_verified(bounty): - checks.append( - _check( - "bounty_payable", - "warn", - f"referenced bounty #{bounty_ref} payability could not be verified", - ) - ) - else: - checks.append( - _check( - "bounty_payable", "pass", _bounty_payability_pass_message(bounty_ref, bounty) - ) - ) - availability_warning = _bounty_availability_warning(bounty_ref, bounty) - if availability_warning is not None: - checks.append(availability_warning) - if bounty is not None: - activity_check = _maintainer_activity_check(bounty_ref, bounty, now) - if activity_check is not None: - checks.append(activity_check) - if "active_attempts" in bounty or "active_attempts_verified" in bounty: - active_attempts = _safe_attempts(bounty) - if active_attempts: - checks.append( - _check( - "active_attempts", - "warn", - f"{len(active_attempts)} active attempt(s) already exist " - f"for bounty #{bounty_ref}", - ) - ) - elif not _active_attempts_verified(bounty): - checks.append( - _check( - "active_attempts", - "warn", - f"active attempts for bounty #{bounty_ref} could not be verified", - ) - ) - else: - checks.append( - _check( - "active_attempts", - "pass", - f"no active attempts found for bounty #{bounty_ref}", - ) - ) - - if SUMMARY_RE.search(text): - checks.append(_check("summary_present", "pass", "summary text found")) - else: - checks.append(_check("summary_present", "warn", "include a concise summary of the work")) - - if _has_evidence(text): - checks.append(_check("evidence_present", "pass", "test or validation evidence found")) - else: - checks.append( - _check( - "evidence_present", - "warn", - "include concrete test or validation evidence before submission", - ) - ) - - similar = _similar_open_prs(pull_requests, bounty_ref, _title_from_submission(text)) - if similar: - checks.append( - _check( - "similar_open_pr", - "warn", - "similar open PRs already reference this bounty", - ) - ) - else: - checks.append(_check("similar_open_pr", "pass", "no similar open PRs found")) - - if any(check["status"] == "fail" for check in checks): - status = "fail" - elif any(check["status"] == "warn" for check in checks): - status = "warn" - else: - status = "pass" - return { - "status": status, - "bounty_reference": bounty_ref, - "checks": checks, - "similar_open_prs": similar, - "active_attempts": _safe_attempts(bounties.get(bounty_ref, {})) if bounty_ref else [], - } - - -def _run_gh_json(args: list[str]) -> Any: - command = " ".join(args) - try: - completed = subprocess.run( - args, - check=True, - capture_output=True, - text=True, - encoding="utf-8", - errors="replace", - timeout=GH_TIMEOUT_SECONDS, - ) - except subprocess.TimeoutExpired as exc: - raise RuntimeError(f"gh command timed out after {GH_TIMEOUT_SECONDS}s: {command}") from exc - except subprocess.CalledProcessError as exc: - raise RuntimeError( - "gh command failed " - f"(exit {exc.returncode}): {command}\n" - f"stdout:\n{exc.stdout or exc.output or ''}\n" - f"stderr:\n{exc.stderr or ''}" - ) from exc - return json.loads(completed.stdout) - - -def _load_issue_maintainer_activity(repo: str, issue_number: int) -> dict[str, Any]: - issue = _run_gh_json( - [ - "gh", - "issue", - "view", - str(issue_number), - "--repo", - repo, - "--json", - "author,comments,createdAt", - ] - ) - activity_times = [] - repo_owner = repo.split("/", 1)[0].lower() - issue_author = str((issue.get("author") or {}).get("login") or "").lower() - created_at = _parse_datetime(issue.get("createdAt")) - if issue_author == repo_owner and created_at is not None: - activity_times.append(created_at) - for comment in issue.get("comments") or []: - if str(comment.get("authorAssociation") or "").upper() not in MAINTAINER_ASSOCIATIONS: - continue - created_at = _parse_datetime(comment.get("createdAt")) - if created_at is not None: - activity_times.append(created_at) - if not activity_times: - return {"maintainer_activity_verified": False} - return { - "maintainer_activity_verified": True, - "last_maintainer_activity_at": _isoformat_utc(max(activity_times)), - } - - -def _load_json_url(url: str, *, description: str) -> Any: - try: - with urlopen(url, timeout=GH_TIMEOUT_SECONDS) as response: - return json.loads(response.read().decode("utf-8")) - except (HTTPError, OSError, URLError, json.JSONDecodeError) as exc: - raise RuntimeError(f"{description} unavailable: {exc}") from exc - - -def _load_api_bounties(repo: str, api_host: str) -> dict[int, dict[str, Any]]: - url = f"{api_host.rstrip('/')}/api/v1/bounties?status=open" - payload = _load_json_url(url, description="MergeWork API bounty data") - if not isinstance(payload, list): - raise RuntimeError("MergeWork API bounty data must be a list") - bounties: dict[int, dict[str, Any]] = {} - for item in payload: - if not isinstance(item, dict) or item.get("repo") != repo: - continue - issue_number = item.get("issue_number") - if not isinstance(issue_number, int): - continue - bounties[issue_number] = { - "id": item.get("id"), - "number": issue_number, - "state": item.get("status", "open"), - "awards_remaining": item.get("awards_remaining"), - } - _copy_effective_availability_fields(item, bounties[issue_number]) - return bounties - - -def _normalize_attempt(raw: dict[str, Any]) -> dict[str, Any]: - return { - "submitter": _attempt_field( - raw, "submitter", "submitter_account", "account", "github_login" - ), - "source_url": _attempt_field(raw, "source_url", "public_source_url", "url"), - "status": _attempt_field(raw, "status"), - "expires_at": _attempt_field(raw, "expires_at", "expiresAt", "expiry_time"), - } - - -def _load_api_attempts(api_host: str, bounty_id: Any) -> list[dict[str, Any]]: - if not isinstance(bounty_id, int): - raise RuntimeError("MergeWork API bounty id unavailable for attempts lookup") - url = f"{api_host.rstrip('/')}/api/v1/bounties/{bounty_id}/attempts" - payload = _load_json_url(url, description="MergeWork API attempts data") - attempts = payload.get("attempts") if isinstance(payload, dict) else payload - if not isinstance(attempts, list): - raise RuntimeError("MergeWork API attempts data must be a list") - return [_normalize_attempt(attempt) for attempt in attempts if isinstance(attempt, dict)] - - -def _load_live_context( - repo: str, - submission_text: str, - api_host: str, - max_maintainer_age_days: int = DEFAULT_MAX_MAINTAINER_AGE_DAYS, -) -> dict[str, Any]: - load_warnings: list[str] = [] - try: - prs = _run_gh_json( - [ - "gh", - "pr", - "list", - "--repo", - repo, - "--state", - "open", - "--limit", - str(GH_PR_SAFETY_CAP), - "--json", - "number,title,url,body,state", - ] - ) - issues = _run_gh_json( - [ - "gh", - "issue", - "list", - "--repo", - repo, - "--state", - "all", - "--limit", - str(GH_ISSUE_SAFETY_CAP), - "--json", - "number,title,state", - ] - ) - except (RuntimeError, FileNotFoundError, json.JSONDecodeError) as exc: - return { - "submission_text": submission_text, - "bounties": [], - "pull_requests": [], - "load_warning": f"live GitHub data unavailable: {exc}", - } - if len(prs) >= GH_PR_SAFETY_CAP: - load_warnings.append( - f"gh pr list reached the {GH_PR_SAFETY_CAP} item safety cap; " - "similar-open-PR checks may be incomplete" - ) - if len(issues) >= GH_ISSUE_SAFETY_CAP: - load_warnings.append( - f"gh issue list reached the {GH_ISSUE_SAFETY_CAP} item safety cap; " - "bounty discovery may be incomplete" - ) - try: - api_bounties = _load_api_bounties(repo, api_host) - except RuntimeError as exc: - api_bounties = {} - load_warnings.append(str(exc)) - referenced_bounties = set(_bounty_refs(submission_text)) - bounties = [] - for issue in issues: - if "bounty" not in str(issue.get("title", "")).lower(): - continue - api_bounty = api_bounties.get(issue["number"], {}) - awards_remaining = api_bounty.get("awards_remaining") - bounty_record = { - "id": api_bounty.get("id"), - "number": issue["number"], - "title": issue.get("title"), - "state": issue.get("state"), - "awards_remaining": awards_remaining, - "payability_verified": issue["number"] in api_bounties - and ( - awards_remaining is not None - or api_bounty.get("effective_awards_remaining") is not None - ), - } - _copy_effective_availability_fields(api_bounty, bounty_record) - if issue["number"] in referenced_bounties: - try: - bounty_record.update(_load_issue_maintainer_activity(repo, issue["number"])) - bounty_record["max_maintainer_age_days"] = max_maintainer_age_days - except (RuntimeError, FileNotFoundError, json.JSONDecodeError) as exc: - bounty_record["maintainer_activity_verified"] = False - load_warnings.append( - f"maintainer activity unavailable for bounty #{issue['number']}: {exc}" - ) - bounty_id = api_bounty.get("id") - if isinstance(bounty_id, int): - try: - bounty_record["active_attempts"] = _load_api_attempts(api_host, bounty_id) - bounty_record["active_attempts_verified"] = True - except RuntimeError as exc: - bounty_record["active_attempts"] = [] - bounty_record["active_attempts_verified"] = False - load_warnings.append( - f"active attempts unavailable for bounty #{issue['number']}: {exc}" - ) - else: - bounty_record["active_attempts"] = [] - bounty_record["active_attempts_verified"] = False - load_warnings.append( - f"active attempts unavailable for bounty #{issue['number']}: " - "MergeWork API bounty id unavailable for attempts lookup" - ) - bounties.append(bounty_record) - data = {"submission_text": submission_text, "bounties": bounties, "pull_requests": prs} - if load_warnings: - data["load_warning"] = "; ".join(load_warnings) - return data - - -def _load_input(path: str) -> dict[str, Any]: - with open(path, encoding="utf-8") as handle: - data = json.load(handle) - if not isinstance(data, dict): - raise ValueError("quality gate input must be a JSON object") - return data - - -def _require_non_empty_path(parser: argparse.ArgumentParser, option_name: str, value: str) -> str: - if not value.strip(): - parser.error(f"{option_name} must be a non-empty path") - return value - - -def format_text(result: dict[str, Any]) -> str: - lines = [f"Submission quality gate: {result['status'].upper()}"] - if result.get("load_warning"): - lines.append(f"Warning: {result['load_warning']}") - if result.get("bounty_reference") is not None: - lines.append(f"Bounty reference: #{result['bounty_reference']}") - for check in result["checks"]: - lines.append(f"- {check['status'].upper()} {check['name']}: {check['message']}") - if result["similar_open_prs"]: - lines.append("Similar open PRs:") - for pr in result["similar_open_prs"]: - lines.append(f"- #{pr['number']}: {pr['title']} {pr.get('url') or ''}".rstrip()) - if result.get("active_attempts"): - lines.append("Active attempts:") - for attempt in result["active_attempts"]: - lines.append(f"- {_format_attempt_summary(attempt)}") - return "\n".join(lines) - - -def main(argv: list[str] | None = None) -> int: - parser = argparse.ArgumentParser(description="Check a MergeWork bounty submission draft.") - source = parser.add_mutually_exclusive_group(required=True) - source.add_argument("--input", help="Read gate input from a JSON fixture file.") - source.add_argument("--text-file", help="Read submission text and live context with gh.") - parser.add_argument("--repo", default="ramimbo/mergework") - parser.add_argument("--api-host", default=DEFAULT_API_HOST, type=public_api_host) - parser.add_argument( - "--max-maintainer-age-days", - type=_non_negative_int, - default=DEFAULT_MAX_MAINTAINER_AGE_DAYS, - help="Warn when the referenced bounty has no maintainer activity within this many days.", - ) - parser.add_argument("--format", choices=["json", "text"], default="text") - args = parser.parse_args(argv) - - if args.input is not None: - data = _load_input(_require_non_empty_path(parser, "--input", args.input)) - else: - text_file = _require_non_empty_path(parser, "--text-file", args.text_file) - with open(text_file, encoding="utf-8") as handle: - data = _load_live_context( - args.repo, - handle.read(), - args.api_host, - args.max_maintainer_age_days, - ) - result = evaluate_submission(data) - if data.get("load_warning"): - result["load_warning"] = data["load_warning"] - - if args.format == "json": - print(json.dumps(result, indent=2, sort_keys=True)) - else: - print(format_text(result)) - return 1 if result["status"] == "fail" else 0 - - -if __name__ == "__main__": - raise SystemExit(main(sys.argv[1:])) +from __future__ import annotations + +import argparse +import json +import re +import subprocess +import sys +from datetime import UTC, datetime, timedelta +from difflib import SequenceMatcher +from pathlib import Path +from typing import Any +from urllib.error import HTTPError, URLError +from urllib.request import urlopen + +if __package__ in {None, ""}: + sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from scripts.api_host_args import public_api_host +from scripts.bounty_refs import BOUNTY_REF_RE, GITHUB_LINKED_ISSUE_RE, LEADING_BOUNTY_REF_RE + +GH_TIMEOUT_SECONDS = 30 + + +def _non_negative_int(value: str) -> int: + """Argparse type that rejects negative --max-maintainer-age-days values. + + A negative threshold makes any maintainer activity look stale (the delta + always exceeds a negative window), so the gate emits a misleading + "stale activity" warning instead of flagging the invalid input. + """ + try: + parsed = int(value) + except (TypeError, ValueError): + raise argparse.ArgumentTypeError(f"expected an integer, got {value!r}") from None + if parsed < 0: + raise argparse.ArgumentTypeError(f"must be >= 0, got {parsed}") + return parsed + + +EVIDENCE_RE = re.compile( + r"\b(pytest|ruff|mypy|validation|verified|test evidence|checks? passed)\b", + re.IGNORECASE, +) +SUMMARY_RE = re.compile(r"\b(summary|what changed|changes?)\b", re.IGNORECASE) +DEFAULT_API_HOST = "https://api.mrwk.online" +DEFAULT_MAX_MAINTAINER_AGE_DAYS = 14 +GH_PR_SAFETY_CAP = 101 +GH_ISSUE_SAFETY_CAP = 201 +MAINTAINER_ASSOCIATIONS = {"OWNER", "MEMBER", "COLLABORATOR"} +MAX_BOUNTY_REF = 2**63 - 1 +EFFECTIVE_AVAILABILITY_FIELDS = ( + "effective_awards_remaining", + "effective_available_mrwk", + "availability_state", + "availability_note", + "pending_payout_awards", +) + + +def _check(name: str, status: str, message: str) -> dict[str, str]: + return {"name": name, "status": status, "message": message} + + +def _first_present(raw: dict[str, Any], *names: str) -> Any: + for name in names: + if name in raw: + return raw.get(name) + return None + + +def _int_or_none(value: Any) -> int | None: + try: + return int(value) + except (TypeError, ValueError): + return None + + +def _extract_issue_refs(text: str, pattern: re.Pattern[str]) -> list[int]: + refs: list[int] = [] + seen: set[int] = set() + for match in pattern.findall(text): + try: + ref = int(match) + except ValueError: + continue + if ref > MAX_BOUNTY_REF: + continue + if ref in seen: + continue + seen.add(ref) + refs.append(ref) + return refs + + +def _bounty_refs(text: str) -> list[int]: + return _extract_issue_refs(text, BOUNTY_REF_RE) + + +def _github_linked_issue_refs(text: str) -> list[int]: + return _extract_issue_refs(text, GITHUB_LINKED_ISSUE_RE) + + +def _bounty_has_open_state(raw: dict[str, Any]) -> bool: + return str(_first_present(raw, "state", "status") or "").lower() in {"", "open"} + + +def _effective_awards_remaining(raw: dict[str, Any]) -> int | None: + value = _first_present(raw, "effective_awards_remaining", "effectiveAwardsRemaining") + if value is None: + return None + return _int_or_none(value) + + +def _availability_note_suffix(raw: dict[str, Any]) -> str: + note = str(_first_present(raw, "availability_note", "availabilityNote") or "").strip() + return f" ({note})" if note else "" + + +def _bounty_is_payable(raw: dict[str, Any]) -> bool: + if not _bounty_has_open_state(raw): + return False + effective_remaining = _effective_awards_remaining(raw) + if effective_remaining is not None: + return effective_remaining > 0 + remaining = raw.get("awards_remaining", raw.get("awardsRemaining")) + if remaining is None: + return True + parsed_remaining = _int_or_none(remaining) + return parsed_remaining is not None and parsed_remaining > 0 + + +def _bounty_payability_fail_message(bounty_ref: int, raw: dict[str, Any]) -> str: + if _bounty_has_open_state(raw) and _effective_awards_remaining(raw) is not None: + return ( + f"referenced bounty #{bounty_ref} has no effective awards remaining" + f"{_availability_note_suffix(raw)}" + ) + return f"referenced bounty #{bounty_ref} is closed or exhausted{_availability_note_suffix(raw)}" + + +def _bounty_payability_pass_message(bounty_ref: int, raw: dict[str, Any]) -> str: + effective_remaining = _effective_awards_remaining(raw) + if effective_remaining is None: + return f"referenced bounty #{bounty_ref} is open{_availability_note_suffix(raw)}" + return ( + f"referenced bounty #{bounty_ref} is open with {effective_remaining} " + f"effective award(s) remaining{_availability_note_suffix(raw)}" + ) + + +def _bounty_availability_warning(bounty_ref: int, raw: dict[str, Any]) -> dict[str, str] | None: + if not _bounty_has_open_state(raw): + return None + availability_state = str( + _first_present(raw, "availability_state", "availabilityState") or "" + ).lower() + pending_payout_awards = _int_or_none( + _first_present(raw, "pending_payout_awards", "pendingPayoutAwards") + ) + has_pending_payouts = pending_payout_awards is not None and pending_payout_awards > 0 + has_partial_state = availability_state == "pending_payouts_partial" + if not (has_pending_payouts or has_partial_state): + return None + if _effective_awards_remaining(raw) == 0: + return None + return _check( + "bounty_availability", + "warn", + f"referenced bounty #{bounty_ref} has reduced effective capacity" + f"{_availability_note_suffix(raw)}", + ) + + +def _bounty_payability_verified(raw: dict[str, Any]) -> bool: + return raw.get("payability_verified", True) is not False + + +def _active_attempts_verified(raw: dict[str, Any]) -> bool: + return raw.get("active_attempts_verified", True) is not False + + +def _copy_effective_availability_fields(source: dict[str, Any], target: dict[str, Any]) -> None: + for field in EFFECTIVE_AVAILABILITY_FIELDS: + if field in source: + target[field] = source[field] + + +def _safe_attempts(raw: dict[str, Any]) -> list[dict[str, Any]]: + attempts = raw.get("active_attempts", []) + if not isinstance(attempts, list): + return [] + return [attempt for attempt in attempts if isinstance(attempt, dict)] + + +def _attempt_field(attempt: dict[str, Any], *names: str) -> Any: + for name in names: + value = attempt.get(name) + if value not in (None, ""): + return value + return None + + +def _format_attempt_summary(attempt: dict[str, Any]) -> str: + parts: list[str] = [] + submitter = _attempt_field(attempt, "submitter", "submitter_account", "account", "github_login") + if submitter: + parts.append(f"submitter={submitter}") + source_url = _attempt_field(attempt, "source_url", "public_source_url", "url") + if source_url: + parts.append(f"source={source_url}") + status = _attempt_field(attempt, "status") + if status: + parts.append(f"status={status}") + expires_at = _attempt_field(attempt, "expires_at", "expiresAt", "expiry_time") + if expires_at: + parts.append(f"expires={expires_at}") + return ", ".join(parts) or "active attempt" + + +def _parse_datetime(value: Any) -> datetime | None: + if not isinstance(value, str) or not value: + return None + try: + parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError: + return None + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=UTC) + return parsed.astimezone(UTC) + + +def _isoformat_utc(value: datetime) -> str: + return value.astimezone(UTC).isoformat().replace("+00:00", "Z") + + +def _current_time(data: dict[str, Any]) -> datetime: + return _parse_datetime(data.get("now")) or datetime.now(UTC) + + +def _maintainer_activity_check( + bounty_ref: int, bounty: dict[str, Any], now: datetime +) -> dict[str, str] | None: + if "last_maintainer_activity_at" not in bounty and "maintainer_activity_verified" not in bounty: + return None + if bounty.get("maintainer_activity_verified") is False: + return _check( + "maintainer_activity", + "warn", + f"recent maintainer activity for bounty #{bounty_ref} could not be verified", + ) + last_activity = _parse_datetime(bounty.get("last_maintainer_activity_at")) + if last_activity is None: + return _check( + "maintainer_activity", + "warn", + f"recent maintainer activity for bounty #{bounty_ref} could not be verified", + ) + try: + max_age_days = int(bounty.get("max_maintainer_age_days", DEFAULT_MAX_MAINTAINER_AGE_DAYS)) + except (TypeError, ValueError): + return _check( + "maintainer_activity", + "warn", + f"recent maintainer activity for bounty #{bounty_ref} could not be verified", + ) + if max_age_days < 0: + return _check( + "maintainer_activity", + "warn", + f"invalid maintainer activity threshold for bounty #{bounty_ref}: " + f"max_maintainer_age_days must be >= 0, got {max_age_days}", + ) + delta = now - last_activity + age_days = max(0, int(delta.total_seconds() // 86400)) + if delta > timedelta(days=max_age_days): + return _check( + "maintainer_activity", + "warn", + f"last maintainer activity for bounty #{bounty_ref} was {age_days} days ago", + ) + return _check( + "maintainer_activity", + "pass", + f"maintainer activity for bounty #{bounty_ref} was seen {age_days} days ago", + ) + + +def _title_from_submission(text: str) -> str: + for line in text.splitlines(): + clean = line.strip(" -:\t") + if not clean: + continue + clean = LEADING_BOUNTY_REF_RE.sub("", clean).strip(" -:\t") + if not clean: + continue + if SUMMARY_RE.search(clean) and len(clean.split()) <= 4: + continue + if BOUNTY_REF_RE.search(clean) or EVIDENCE_RE.search(clean): + continue + return " ".join(clean.lower().split()) + return "" + + +def _similarity(left: str, right: str) -> float: + return SequenceMatcher(None, left.lower(), right.lower()).ratio() + + +def _has_evidence(text: str) -> bool: + for line in text.splitlines(): + clean = line.strip() + if not clean: + continue + if SUMMARY_RE.search(clean) and ":" in clean: + continue + if EVIDENCE_RE.search(clean): + return True + return False + + +def _matching_pr_bounty_refs(pr: dict[str, Any]) -> list[int]: + text = "\n".join(str(pr.get(key) or "") for key in ("title", "body")) + return _bounty_refs(text) + + +def _similar_open_prs( + pull_requests: list[dict[str, Any]], bounty_ref: int | None, submission_title: str +) -> list[dict[str, Any]]: + if bounty_ref is None or not submission_title: + return [] + matches: list[dict[str, Any]] = [] + for pr in pull_requests: + if str(pr.get("state") or "OPEN").lower() not in {"open", "opened"}: + continue + if bounty_ref not in _matching_pr_bounty_refs(pr): + continue + title = str(pr.get("title") or "") + if _similarity(submission_title, title) < 0.78: + continue + matches.append( + { + "number": pr.get("number"), + "title": title, + "url": pr.get("url"), + } + ) + return matches + + +def evaluate_submission(data: dict[str, Any]) -> dict[str, Any]: + text = str(data.get("submission_text") or "") + now = _current_time(data) + bounties = { + int(item["number"]): item + for item in data.get("bounties", []) + if isinstance(item, dict) and isinstance(item.get("number"), int) + } + pull_requests = [item for item in data.get("pull_requests", []) if isinstance(item, dict)] + checks: list[dict[str, str]] = [] + load_warning = str(data.get("load_warning") or "").strip() + if load_warning: + checks.append(_check("source_completeness", "warn", load_warning)) + refs = _bounty_refs(text) + bounty_ref = refs[0] if refs else None + if bounty_ref is None: + checks.append( + _check( + "bounty_reference", + "fail", + "submission text must include a bounty reference such as " + "Bounty #, Refs #, Fixes #, or /claim #", + ) + ) + else: + checks.append(_check("bounty_reference", "pass", f"found bounty reference #{bounty_ref}")) + if bounty_ref in _github_linked_issue_refs(text): + checks.append( + _check( + "github_linked_issue", + "pass", + f"GitHub-linking reference found for bounty #{bounty_ref}", + ) + ) + else: + checks.append( + _check( + "github_linked_issue", + "warn", + f"MergeWork bounty reference #{bounty_ref} is valid, but GitHub or bot " + "linked-issue checks may stay skipped without `Refs #" + f"{bounty_ref}`; use closing keywords only when the bounty should close", + ) + ) + if len(refs) > 1: + joined_refs = ", ".join(f"#{ref}" for ref in refs) + checks.append( + _check( + "single_bounty_reference", + "warn", + f"submission references multiple bounties ({joined_refs}); " + "keep one bounty target or split the work", + ) + ) + bounty = bounties.get(bounty_ref) + if bounty is None: + checks.append( + _check( + "bounty_payable", + "warn", + f"referenced bounty #{bounty_ref} was not available in input", + ) + ) + elif not _bounty_is_payable(bounty): + checks.append( + _check( + "bounty_payable", + "fail", + _bounty_payability_fail_message(bounty_ref, bounty), + ) + ) + elif not _bounty_payability_verified(bounty): + checks.append( + _check( + "bounty_payable", + "warn", + f"referenced bounty #{bounty_ref} payability could not be verified", + ) + ) + else: + checks.append( + _check( + "bounty_payable", "pass", _bounty_payability_pass_message(bounty_ref, bounty) + ) + ) + availability_warning = _bounty_availability_warning(bounty_ref, bounty) + if availability_warning is not None: + checks.append(availability_warning) + if bounty is not None: + activity_check = _maintainer_activity_check(bounty_ref, bounty, now) + if activity_check is not None: + checks.append(activity_check) + if "active_attempts" in bounty or "active_attempts_verified" in bounty: + active_attempts = _safe_attempts(bounty) + if active_attempts: + checks.append( + _check( + "active_attempts", + "warn", + f"{len(active_attempts)} active attempt(s) already exist " + f"for bounty #{bounty_ref}", + ) + ) + elif not _active_attempts_verified(bounty): + checks.append( + _check( + "active_attempts", + "warn", + f"active attempts for bounty #{bounty_ref} could not be verified", + ) + ) + else: + checks.append( + _check( + "active_attempts", + "pass", + f"no active attempts found for bounty #{bounty_ref}", + ) + ) + + if SUMMARY_RE.search(text): + checks.append(_check("summary_present", "pass", "summary text found")) + else: + checks.append(_check("summary_present", "warn", "include a concise summary of the work")) + + if _has_evidence(text): + checks.append(_check("evidence_present", "pass", "test or validation evidence found")) + else: + checks.append( + _check( + "evidence_present", + "warn", + "include concrete test or validation evidence before submission", + ) + ) + + similar = _similar_open_prs(pull_requests, bounty_ref, _title_from_submission(text)) + if similar: + checks.append( + _check( + "similar_open_pr", + "warn", + "similar open PRs already reference this bounty", + ) + ) + else: + checks.append(_check("similar_open_pr", "pass", "no similar open PRs found")) + + if any(check["status"] == "fail" for check in checks): + status = "fail" + elif any(check["status"] == "warn" for check in checks): + status = "warn" + else: + status = "pass" + return { + "status": status, + "bounty_reference": bounty_ref, + "checks": checks, + "similar_open_prs": similar, + "active_attempts": _safe_attempts(bounties.get(bounty_ref, {})) if bounty_ref else [], + } + + +def _run_gh_json(args: list[str]) -> Any: + command = " ".join(args) + try: + completed = subprocess.run( + args, + check=True, + capture_output=True, + text=True, + encoding="utf-8", + errors="replace", + timeout=GH_TIMEOUT_SECONDS, + ) + except subprocess.TimeoutExpired as exc: + raise RuntimeError(f"gh command timed out after {GH_TIMEOUT_SECONDS}s: {command}") from exc + except subprocess.CalledProcessError as exc: + raise RuntimeError( + "gh command failed " + f"(exit {exc.returncode}): {command}\n" + f"stdout:\n{exc.stdout or exc.output or ''}\n" + f"stderr:\n{exc.stderr or ''}" + ) from exc + return json.loads(completed.stdout) + + +def _load_issue_maintainer_activity(repo: str, issue_number: int) -> dict[str, Any]: + issue = _run_gh_json( + [ + "gh", + "issue", + "view", + str(issue_number), + "--repo", + repo, + "--json", + "author,comments,createdAt", + ] + ) + activity_times = [] + repo_owner = repo.split("/", 1)[0].lower() + issue_author = str((issue.get("author") or {}).get("login") or "").lower() + created_at = _parse_datetime(issue.get("createdAt")) + if issue_author == repo_owner and created_at is not None: + activity_times.append(created_at) + for comment in issue.get("comments") or []: + if str(comment.get("authorAssociation") or "").upper() not in MAINTAINER_ASSOCIATIONS: + continue + created_at = _parse_datetime(comment.get("createdAt")) + if created_at is not None: + activity_times.append(created_at) + if not activity_times: + return {"maintainer_activity_verified": False} + return { + "maintainer_activity_verified": True, + "last_maintainer_activity_at": _isoformat_utc(max(activity_times)), + } + + +def _load_json_url(url: str, *, description: str) -> Any: + try: + with urlopen(url, timeout=GH_TIMEOUT_SECONDS) as response: + return json.loads(response.read().decode("utf-8")) + except (HTTPError, OSError, URLError, json.JSONDecodeError) as exc: + raise RuntimeError(f"{description} unavailable: {exc}") from exc + + +def _load_api_bounties(repo: str, api_host: str) -> dict[int, dict[str, Any]]: + url = f"{api_host.rstrip('/')}/api/v1/bounties?status=open" + payload = _load_json_url(url, description="MergeWork API bounty data") + if not isinstance(payload, list): + raise RuntimeError("MergeWork API bounty data must be a list") + bounties: dict[int, dict[str, Any]] = {} + for item in payload: + if not isinstance(item, dict) or item.get("repo") != repo: + continue + issue_number = item.get("issue_number") + if not isinstance(issue_number, int): + continue + bounties[issue_number] = { + "id": item.get("id"), + "number": issue_number, + "state": item.get("status", "open"), + "awards_remaining": item.get("awards_remaining"), + } + _copy_effective_availability_fields(item, bounties[issue_number]) + return bounties + + +def _normalize_attempt(raw: dict[str, Any]) -> dict[str, Any]: + return { + "submitter": _attempt_field( + raw, "submitter", "submitter_account", "account", "github_login" + ), + "source_url": _attempt_field(raw, "source_url", "public_source_url", "url"), + "status": _attempt_field(raw, "status"), + "expires_at": _attempt_field(raw, "expires_at", "expiresAt", "expiry_time"), + } + + +def _load_api_attempts(api_host: str, bounty_id: Any) -> list[dict[str, Any]]: + if not isinstance(bounty_id, int): + raise RuntimeError("MergeWork API bounty id unavailable for attempts lookup") + url = f"{api_host.rstrip('/')}/api/v1/bounties/{bounty_id}/attempts" + payload = _load_json_url(url, description="MergeWork API attempts data") + attempts = payload.get("attempts") if isinstance(payload, dict) else payload + if not isinstance(attempts, list): + raise RuntimeError("MergeWork API attempts data must be a list") + return [_normalize_attempt(attempt) for attempt in attempts if isinstance(attempt, dict)] + + +def _load_live_context( + repo: str, + submission_text: str, + api_host: str, + max_maintainer_age_days: int = DEFAULT_MAX_MAINTAINER_AGE_DAYS, +) -> dict[str, Any]: + load_warnings: list[str] = [] + try: + prs = _run_gh_json( + [ + "gh", + "pr", + "list", + "--repo", + repo, + "--state", + "open", + "--limit", + str(GH_PR_SAFETY_CAP), + "--json", + "number,title,url,body,state", + ] + ) + issues = _run_gh_json( + [ + "gh", + "issue", + "list", + "--repo", + repo, + "--state", + "all", + "--limit", + str(GH_ISSUE_SAFETY_CAP), + "--json", + "number,title,state", + ] + ) + except (RuntimeError, FileNotFoundError, json.JSONDecodeError) as exc: + return { + "submission_text": submission_text, + "bounties": [], + "pull_requests": [], + "load_warning": f"live GitHub data unavailable: {exc}", + } + if len(prs) >= GH_PR_SAFETY_CAP: + load_warnings.append( + f"gh pr list reached the {GH_PR_SAFETY_CAP} item safety cap; " + "similar-open-PR checks may be incomplete" + ) + if len(issues) >= GH_ISSUE_SAFETY_CAP: + load_warnings.append( + f"gh issue list reached the {GH_ISSUE_SAFETY_CAP} item safety cap; " + "bounty discovery may be incomplete" + ) + try: + api_bounties = _load_api_bounties(repo, api_host) + except RuntimeError as exc: + api_bounties = {} + load_warnings.append(str(exc)) + referenced_bounties = set(_bounty_refs(submission_text)) + bounties = [] + for issue in issues: + if "bounty" not in str(issue.get("title", "")).lower(): + continue + api_bounty = api_bounties.get(issue["number"], {}) + awards_remaining = api_bounty.get("awards_remaining") + bounty_record = { + "id": api_bounty.get("id"), + "number": issue["number"], + "title": issue.get("title"), + "state": issue.get("state"), + "awards_remaining": awards_remaining, + "payability_verified": issue["number"] in api_bounties + and ( + awards_remaining is not None + or api_bounty.get("effective_awards_remaining") is not None + ), + } + _copy_effective_availability_fields(api_bounty, bounty_record) + if issue["number"] in referenced_bounties: + try: + bounty_record.update(_load_issue_maintainer_activity(repo, issue["number"])) + bounty_record["max_maintainer_age_days"] = max_maintainer_age_days + except (RuntimeError, FileNotFoundError, json.JSONDecodeError) as exc: + bounty_record["maintainer_activity_verified"] = False + load_warnings.append( + f"maintainer activity unavailable for bounty #{issue['number']}: {exc}" + ) + bounty_id = api_bounty.get("id") + if isinstance(bounty_id, int): + try: + bounty_record["active_attempts"] = _load_api_attempts(api_host, bounty_id) + bounty_record["active_attempts_verified"] = True + except RuntimeError as exc: + bounty_record["active_attempts"] = [] + bounty_record["active_attempts_verified"] = False + load_warnings.append( + f"active attempts unavailable for bounty #{issue['number']}: {exc}" + ) + else: + bounty_record["active_attempts"] = [] + bounty_record["active_attempts_verified"] = False + load_warnings.append( + f"active attempts unavailable for bounty #{issue['number']}: " + "MergeWork API bounty id unavailable for attempts lookup" + ) + bounties.append(bounty_record) + data = {"submission_text": submission_text, "bounties": bounties, "pull_requests": prs} + if load_warnings: + data["load_warning"] = "; ".join(load_warnings) + return data + + +def _load_input(path: str) -> dict[str, Any]: + with open(path, encoding="utf-8") as handle: + data = json.load(handle) + if not isinstance(data, dict): + raise ValueError("quality gate input must be a JSON object") + return data + + +def _require_non_empty_path(parser: argparse.ArgumentParser, option_name: str, value: str) -> str: + if not value.strip(): + parser.error(f"{option_name} must be a non-empty path") + return value + + +def format_text(result: dict[str, Any]) -> str: + lines = [f"Submission quality gate: {result['status'].upper()}"] + if result.get("load_warning"): + lines.append(f"Warning: {result['load_warning']}") + if result.get("bounty_reference") is not None: + lines.append(f"Bounty reference: #{result['bounty_reference']}") + for check in result["checks"]: + lines.append(f"- {check['status'].upper()} {check['name']}: {check['message']}") + if result["similar_open_prs"]: + lines.append("Similar open PRs:") + for pr in result["similar_open_prs"]: + lines.append(f"- #{pr['number']}: {pr['title']} {pr.get('url') or ''}".rstrip()) + if result.get("active_attempts"): + lines.append("Active attempts:") + for attempt in result["active_attempts"]: + lines.append(f"- {_format_attempt_summary(attempt)}") + return "\n".join(lines) + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description="Check a MergeWork bounty submission draft.") + source = parser.add_mutually_exclusive_group(required=True) + source.add_argument("--input", help="Read gate input from a JSON fixture file.") + source.add_argument("--text-file", help="Read submission text and live context with gh.") + parser.add_argument("--repo", default="ramimbo/mergework") + parser.add_argument("--api-host", default=DEFAULT_API_HOST, type=public_api_host) + parser.add_argument( + "--max-maintainer-age-days", + type=_non_negative_int, + default=DEFAULT_MAX_MAINTAINER_AGE_DAYS, + help="Warn when the referenced bounty has no maintainer activity within this many days.", + ) + parser.add_argument("--format", choices=["json", "text"], default="text") + args = parser.parse_args(argv) + + if args.input is not None: + data = _load_input(_require_non_empty_path(parser, "--input", args.input)) + else: + text_file = _require_non_empty_path(parser, "--text-file", args.text_file) + with open(text_file, encoding="utf-8") as handle: + data = _load_live_context( + args.repo, + handle.read(), + args.api_host, + args.max_maintainer_age_days, + ) + result = evaluate_submission(data) + if data.get("load_warning"): + result["load_warning"] = data["load_warning"] + + if args.format == "json": + print(json.dumps(result, indent=2, sort_keys=True)) + else: + print(format_text(result)) + return 1 if result["status"] == "fail" else 0 + + +if __name__ == "__main__": + raise SystemExit(main(sys.argv[1:])) diff --git a/tests/test_pr_queue_health.py b/tests/test_pr_queue_health.py index e965c750..22e42866 100644 --- a/tests/test_pr_queue_health.py +++ b/tests/test_pr_queue_health.py @@ -546,3 +546,25 @@ def fake_run(args, **kwargs): with pytest.raises(RuntimeError, match="pr list reached the 201 item safety cap"): pr_queue_health.load_live_queue("ramimbo/mergework") + + +@pytest.mark.parametrize( + ("source_args", "expected_message"), + ( + (["--input", ""], "--input must be a non-empty value"), + (["--input", " "], "--input must be a non-empty value"), + (["--repo", ""], "--repo must be a non-empty value"), + (["--repo", " "], "--repo must be a non-empty value"), + (["--repo", " ramimbo/mergework "], "--repo must not include"), + ), +) +def test_pr_queue_health_rejects_empty_source_args( + source_args: list[str], + expected_message: str, + capsys, +) -> None: + with pytest.raises(SystemExit) as excinfo: + main([*source_args, "--format", "json"]) + + assert excinfo.value.code == 2 + assert expected_message in capsys.readouterr().err