diff --git a/automaton/bounty_hunter/README.md b/automaton/bounty_hunter/README.md new file mode 100644 index 000000000..76ee199d2 --- /dev/null +++ b/automaton/bounty_hunter/README.md @@ -0,0 +1,119 @@ +# Autonomous Bounty Hunter Agent + +An autonomous multi-agent system for SolFoundry that finds, analyzes, implements, tests, and submits solutions for GitHub bounty issues — entirely automatically. + +## Overview + +Built for **Bounty #861** — *Full Autonomous Bounty-Hunting Agent* — this agent implements a state machine that: + +1. **Scans** for open bounties via GitHub API (filtered by `bounty` label) +2. **Analyzes** each issue's requirements and codebase context +3. **Plans** implementation using an LLM planner +4. **Implements** code by writing/editing files in a local fork +5. **Tests** using the project's existing test framework +6. **Submits** a properly formatted PR to the SolFoundry repo + +## Architecture + +``` +BountyHunterAgent (state machine) +├── GitHubClient — GitHub API: list issues, create branches, push, create PRs +├── Planner — LLM: generate implementation plans and code +├── Coder — Local file operations: write, commit, push +└── Tester — Detect framework, run tests, parse results +``` + +## State Machine + +``` +IDLE → SCANNING → ANALYZING → PLANNING → IMPLEMENTING → TESTING → SUBMITTING → DONE + ↓ + BLOCKED (retry) +``` + +## Installation + +```bash +# Requires Python 3.10+ +pip install -e automaton/bounty_hunter + +# Or install dependencies manually +pip install openai # Optional, for LLM planning +``` + +## Configuration + +Set these environment variables: + +```bash +export GITHUB_TOKEN="ghp_your_token_here" +export OPENAI_API_KEY="sk-your-key" # Optional, enables LLM planning +``` + +## Usage + +### CLI + +```bash +# Scan for open bounties +python -m automaton.bounty_hunter.agent --scan + +# Hunt a specific bounty +python -m automaton.bounty_hunter.agent --bounty 861 + +# Auto-select best available bounty +python -m automaton.bounty_hunter.agent +``` + +### Python API + +```python +from automaton.bounty_hunter import BountyHunterAgent, AgentConfig + +config = AgentConfig( + github_owner="your-github-username", + github_repo="solfoundry", + wallet_address="YourSolanaWalletAddress", +) + +agent = BountyHunterAgent(config) + +# Hunt a specific bounty +result = agent.run(bounty_number=861) +print(f"PR: {result.pr_url}") + +# Or auto-select the best bounty +result = agent.hunt_best_bounty() +``` + +## Output + +When a bounty is successfully hunted: + +- A new branch in your fork: `feat/bounty-hunter-{number}` +- Files written and committed +- Tests run (if applicable) +- PR submitted to SolFoundry/solfoundry with: + - `Closes #{number}` reference + - Implementation plan in PR body + - Your wallet address for payout + +## Bounty Tiers & Scores + +| Tier | Min Score | Reward | Notes | +|------|-----------|--------|-------| +| T1 | 6.0/10 | Up to $100 | Open race | +| T2 | 6.5/10 | $100–$500 | Assigned bounty | +| T3 | 7.0/10 | $500+ | Full autonomy | + +## Module Reference + +- `agent.py` — Main `BountyHunterAgent` class + `AgentConfig` +- `github_client.py` — `GitHubClient` + `Bounty` dataclass +- `planner.py` — `Planner` (LLM-based) + `ImplementationPlan` +- `coder.py` — `Coder` for local file operations +- `tester.py` — `Tester` for test execution + `TestResult` + +## License + +MIT — SolFoundry Bounty Platform diff --git a/automaton/bounty_hunter/__init__.py b/automaton/bounty_hunter/__init__.py new file mode 100644 index 000000000..ebdd02c1b --- /dev/null +++ b/automaton/bounty_hunter/__init__.py @@ -0,0 +1,28 @@ +""" +SolFoundry Autonomous Bounty Hunter Agent + +An autonomous multi-agent system that: +1. Scans for open bounties on GitHub +2. Analyzes requirements and codebase +3. Plans implementation approach +4. Implements solutions +5. Runs tests +6. Submits PRs +""" + +from .agent import BountyHunterAgent, AgentState +from .github_client import GitHubClient +from .planner import Planner +from .coder import Coder +from .tester import Tester + +__all__ = [ + "BountyHunterAgent", + "AgentState", + "GitHubClient", + "Planner", + "Coder", + "Tester", +] + +__version__ = "0.1.0" diff --git a/automaton/bounty_hunter/agent.py b/automaton/bounty_hunter/agent.py new file mode 100644 index 000000000..1afed99a1 --- /dev/null +++ b/automaton/bounty_hunter/agent.py @@ -0,0 +1,367 @@ +""" +Main Bounty Hunter Agent — State Machine Implementation. + +State transitions: + IDLE → SCANNING → ANALYZING → PLANNING → IMPLEMENTING → TESTING → SUBMITTING → DONE + ↓ + BLOCKED (retry) +""" + +import os +import time +import json +import logging +from enum import Enum +from dataclasses import dataclass, field +from typing import Optional +from datetime import datetime + +from .github_client import GitHubClient, Bounty, BountyTier +from .planner import Planner, ImplementationPlan +from .coder import Coder, FileChange +from .tester import Tester, TestResult + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger("bounty_hunter") + + +class AgentState(Enum): + IDLE = "idle" + SCANNING = "scanning" + ANALYZING = "analyzing" + PLANNING = "planning" + IMPLEMENTING = "implementing" + TESTING = "testing" + SUBMITTING = "submitting" + DONE = "done" + BLOCKED = "blocked" + ERROR = "error" + + +@dataclass +class AgentConfig: + """Configuration for the Bounty Hunter Agent.""" + github_owner: str = "liufang88789-ui" # Fork owner + github_repo: str = "solfoundry" + upstream_owner: str = "SolFoundry" + upstream_repo: str = "solfoundry" + local_repo_path: str = "/tmp/solfoundry-bounty" + branch_prefix: str = "feat/bounty-hunter" + wallet_address: str = "7UqBdYyy9LG59Un6yzjAW8HPcTC4J63B9cZxBHWhhHsg" + evm_wallet: str = "0x7F3a01563C504bD57aa465dd6273Ef21AF8F7784" + max_retries: int = 2 + min_bounty_tier: BountyTier = BountyTier.TIER_3 # Only go for T3 by default + # LLM config + llm_model: str = "gpt-4" + llm_api_key: str = None + llm_api_base: str = None + + +@dataclass +class AgentRun: + """Record of a single agent run for a specific bounty.""" + bounty_number: int + bounty_title: str + state: AgentState = AgentState.IDLE + started_at: datetime = field(default_factory=datetime.utcnow) + completed_at: Optional[datetime] = None + plan: Optional[ImplementationPlan] = None + changes: list[FileChange] = field(default_factory=list) + test_result: Optional[TestResult] = None + pr_url: Optional[str] = None + pr_number: Optional[int] = None + error_message: Optional[str] = None + retry_count: int = 0 + + +class BountyHunterAgent: + """ + Autonomous Bounty Hunting Agent. + + A state-machine agent that finds bounties, plans implementations, + writes code, runs tests, and submits PRs. + """ + + def __init__(self, config: AgentConfig = None): + self.config = config or AgentConfig() + self.github = GitHubClient( + owner=self.config.github_owner, + repo=self.config.github_repo, + upstream_owner=self.config.upstream_owner, + upstream_repo=self.config.upstream_repo, + ) + self.github.token = os.environ.get("GITHUB_TOKEN", self.github.token) + + self.planner = Planner( + model=self.config.llm_model, + api_key=self.config.llm_api_key or os.environ.get("OPENAI_API_KEY"), + api_base=self.config.llm_api_base or os.environ.get("OPENAI_API_BASE"), + ) + + self.coder = Coder(self.config.local_repo_path) + self.tester = Tester(self.config.local_repo_path) + + self.current_run: Optional[AgentRun] = None + self._state = AgentState.IDLE + + @property + def state(self) -> AgentState: + return self._state + + def _set_state(self, state: AgentState): + self._state = state + if self.current_run: + self.current_run.state = state + + def run(self, bounty_number: int = None, bounty: Bounty = None) -> AgentRun: + """ + Execute a full bounty hunting run. + Either provide a bounty_number (will fetch it) or a pre-fetched Bounty object. + """ + if bounty is None: + if bounty_number is None: + raise ValueError("Must provide either bounty_number or bounty") + issue = self.github.get_issue(bounty_number) + bounty = Bounty.from_issue(issue) + + self.current_run = AgentRun(bounty_number=bounty.number, bounty_title=bounty.title) + self._set_state(AgentState.SCANNING) + + logger.info(f"Starting bounty hunt: #{bounty.number} — {bounty.title}") + + try: + self._run_impl(bounty) + self._set_state(AgentState.DONE) + self.current_run.completed_at = datetime.utcnow() + except Exception as e: + logger.error(f"Error during bounty hunt: {e}") + self._set_state(AgentState.ERROR) + self.current_run.error_message = str(e) + self.current_run.completed_at = datetime.utcnow() + + return self.current_run + + def _run_impl(self, bounty: Bounty): + """Implementation of the bounty hunting state machine.""" + + # ── ANALYZING ────────────────────────────────────────── + self._set_state(AgentState.ANALYZING) + logger.info(f"Analyzing bounty #{bounty.number}...") + + issue_full = self.github.get_issue(bounty.number) + bounty_body = issue_full.get("body", "") + bounty_title = issue_full.get("title", "") + + # Get codebase structure for context + structure = self.coder.get_codebase_structure(max_depth=3) + logger.info(f"Codebase structure:\n{structure[:500]}") + + # ── PLANNING ────────────────────────────────────────── + self._set_state(AgentState.PLANNING) + logger.info(f"Creating implementation plan for #{bounty.number}...") + + plan = self.planner.create_plan( + bounty_body=bounty_body, + bounty_title=bounty_title, + bounty_number=bounty.number, + codebase_structure=structure, + ) + + self.current_run.plan = plan + logger.info(f"Plan created: {plan.summary}") + logger.info(f"Complexity: {plan.estimated_complexity}, Steps: {len(plan.steps)}") + + # ── IMPLEMENTING ────────────────────────────────────── + self._set_state(AgentState.IMPLEMENTING) + logger.info(f"Implementing bounty #{bounty.number}...") + + # Create a branch for this work + branch_name = f"{self.config.branch_prefix}-{bounty.number}" + try: + self.github.create_branch(branch_name) + logger.info(f"Branch created: {branch_name}") + except Exception as e: + logger.warning(f"Branch may already exist or fork not ready: {e}") + + # Implement each step + all_changes = {} + for step in plan.steps: + logger.info(f"Implementing step {step.order}: {step.description}") + + code_map = self.planner.generate_code( + plan=plan, + codebase_context=structure, + step=step + ) + + if code_map: + # Apply to local repo + applied = self.coder.apply_changes(code_map) + self.current_run.changes.extend(applied) + + for path, content in code_map.items(): + all_changes[path] = content + logger.info(f" Wrote: {path}") + else: + logger.warning(f" No code generated for step {step.order}") + + if not all_changes: + logger.error("No code was generated!") + raise RuntimeError("No code generated by planner") + + # ── TESTING ───────────────────────────────────────────── + self._set_state(AgentState.TESTING) + logger.info("Running tests...") + + test_result = self.tester.run_tests() + self.current_run.test_result = test_result + + if test_result.passed: + logger.info(f"Tests passed ({test_result.passed_tests}/{test_result.total_tests})") + else: + logger.warning(f"Tests failed: {test_result.output[-500:]}") + # Continue anyway if only minor failures + + # ── SUBMITTING ────────────────────────────────────────── + self._set_state(AgentState.SUBMITTING) + logger.info("Creating pull request...") + + changed_files = list(all_changes.keys()) + + # Stage and commit + commit_msg = f"feat: {bounty.title} (Bounty #{bounty.number})\n\nCloses #{bounty.number}" + code, commit_out = self.coder.stage_and_commit(changed_files, commit_msg) + + if code != 0: + logger.warning(f"Commit issue: {commit_out}") + # Try with force-add + self.coder.run_command(["git", "add", "-f", "--"] + changed_files) + self.coder.run_command(["git", "commit", "-m", commit_msg]) + + # Push branch + push_code, push_out = self.coder.push_branch(branch_name) + if push_code != 0: + logger.warning(f"Push may have failed: {push_out}") + + # Create PR + pr_body = self._build_pr_body(bounty, plan) + pr = self.github.create_pull_request( + title=f"feat: {bounty.title} (Bounty #{bounty.number})", + body=pr_body, + head_branch=branch_name, + ) + + self.current_run.pr_url = pr.get("html_url") + self.current_run.pr_number = pr.get("number") + logger.info(f"PR created: {self.current_run.pr_url}") + + def _build_pr_body(self, bounty: Bounty, plan: ImplementationPlan) -> str: + """Build the PR description body following SolFoundry format.""" + steps_md = "\n".join( + f"{i+1}. **{s.description}** — modify `{', '.join(s.files_to_modify)}`, create `{', '.join(s.files_to_create)}`" + for i, s in enumerate(plan.steps) + ) + + return f"""## Overview + +{plan.summary} + +*Submitted by Autonomous Bounty Hunter Agent — SolFoundry Bounty #{bounty.number}* + +--- + +## Implementation Details + +{steps_md} + +## Testing + +- Framework: {self.tester.detect_test_framework() or 'standard test suite'} +- Result: {self.current_run.test_result.summary if self.current_run.test_result else 'N/A'} + +## Files Changed + +{chr(10).join(f"- `{f}`" for f in self.current_run.changes)} + +--- + +## Wallet + +**Solana:** `{self.config.wallet_address}` + +**EVM:** `{self.config.evm_wallet}` + +--- + +Closes #{bounty.number} +""" + + def scan_for_bounties(self) -> list[Bounty]: + """ + Scan for available bounties. + Returns list of Bounty objects. + """ + logger.info("Scanning for open bounties...") + bounties = self.github.list_open_bounties() + logger.info(f"Found {len(bounties)} open bounties") + return bounties + + def hunt_best_bounty(self) -> AgentRun: + """ + Find the best available bounty and hunt it. + Returns the AgentRun result. + """ + bounties = self.scan_for_bounties() + + if not bounties: + logger.info("No suitable bounties found") + self._set_state(AgentState.IDLE) + return None + + # Pick the best one (highest tier, unassigned) + best = max(bounties, key=lambda b: ( + 3 if b.tier == BountyTier.TIER_3 else 2 if b.tier == BountyTier.TIER_2 else 1 + )) + + logger.info(f"Selected best bounty: #{best.number} ({best.tier} — {best.title})") + return self.run(bounty=best) + + +# ── CLI Entry Point ────────────────────────────────────────────────────────── + +def main(): + import argparse + + parser = argparse.ArgumentParser(description="SolFoundry Autonomous Bounty Hunter") + parser.add_argument("--bounty", type=int, help="Bounty issue number to hunt") + parser.add_argument("--scan", action="store_true", help="Only scan, don't hunt") + parser.add_argument("--branch-prefix", default="feat/bounty-hunter") + args = parser.parse_args() + + config = AgentConfig(branch_prefix=args.branch_prefix) + agent = BountyHunterAgent(config) + + if args.scan: + bounties = agent.scan_for_bounties() + print(f"\n{'#':<6} {'Tier':<8} {'Domain':<12} {'Title'}") + print("-" * 80) + for b in sorted(bounties, key=lambda x: (x.tier.value if x.tier else ""), reverse=True): + tier = b.tier.value if b.tier else "?" + print(f"{b.number:<6} {tier:<8} {(b.domain or ''):<12} {b.title[:50]}") + elif args.bounty: + result = agent.run(bounty_number=args.bounty) + print(f"\nResult: {result.state.value}") + if result.pr_url: + print(f"PR: {result.pr_url}") + if result.error_message: + print(f"Error: {result.error_message}") + else: + result = agent.hunt_best_bounty() + if result and result.pr_url: + print(f"\nPR submitted: {result.pr_url}") + elif result: + print(f"\nState: {result.state.value}") + + +if __name__ == "__main__": + main() diff --git a/automaton/bounty_hunter/coder.py b/automaton/bounty_hunter/coder.py new file mode 100644 index 000000000..cb7043624 --- /dev/null +++ b/automaton/bounty_hunter/coder.py @@ -0,0 +1,165 @@ +""" +Coder module for the Bounty Hunter Agent. +Handles code generation, file creation, and codebase exploration. +""" + +import os +import subprocess +import shutil +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class FileChange: + """Represents a file that was created or modified.""" + path: str + content: str + action: str # "create" or "modify" + + +class Coder: + """ + Code generation and manipulation module. + Works with local git clones for efficient file operations. + """ + + def __init__(self, local_repo_path: str): + self.local_repo_path = local_repo_path + + def apply_changes(self, changes: dict[str, str]) -> list[FileChange]: + """ + Apply a dict of {filepath: content} changes to the local repo. + Returns list of FileChange objects. + """ + applied = [] + for path, content in changes.items(): + full_path = os.path.join(self.local_repo_path, path) + os.makedirs(os.path.dirname(full_path), exist_ok=True) + with open(full_path, "w", encoding="utf-8") as f: + f.write(content) + action = "modify" if os.path.exists(full_path) else "create" + applied.append(FileChange(path=path, content=content, action=action)) + return applied + + def read_file(self, path: str) -> Optional[str]: + """Read a file from the local repo.""" + try: + full_path = os.path.join(self.local_repo_path, path) + with open(full_path, "r", encoding="utf-8") as f: + return f.read() + except Exception: + return None + + def file_exists(self, path: str) -> bool: + """Check if a file exists in the local repo.""" + return os.path.exists(os.path.join(self.local_repo_path, path)) + + def list_directory(self, path: str = "") -> list[str]: + """List files in a directory (non-recursive).""" + full_path = os.path.join(self.local_repo_path, path) + if not os.path.isdir(full_path): + return [] + return os.listdir(full_path) + + def get_codebase_structure(self, max_depth: int = 3, exclude_dirs: list[str] = None) -> str: + """ + Generate a tree-style overview of the codebase structure. + """ + exclude_dirs = exclude_dirs or [ + "node_modules", ".git", "__pycache__", ".venv", "venv", + "dist", "build", ".pytest_cache", "*.pyc", ".next" + ] + + lines = [] + for root, dirs, files in os.walk(self.local_repo_path): + # Filter exclude dirs in-place + dirs[:] = [d for d in dirs if not any( + ex in d or d.endswith(ex.replace("*", "")) + for ex in exclude_dirs + )] + + depth = root.replace(self.local_repo_path, "").count(os.sep) + if depth > max_depth: + continue + + indent = " " * depth + rel_path = os.path.relpath(root, self.local_repo_path) + if rel_path == ".": + rel_path = "/" + lines.append(f"{indent}{os.path.basename(root)}/") + + file_indent = " " * (depth + 1) + for f in sorted(files): + if not any(ex in f for ex in exclude_dirs): + lines.append(f"{file_indent}{f}") + + return "\n".join(lines) + + def run_command(self, cmd: list[str], timeout: int = 120) -> tuple[int, str, str]: + """ + Run a shell command in the repo directory. + Returns (exit_code, stdout, stderr). + """ + try: + result = subprocess.run( + cmd, + cwd=self.local_repo_path, + capture_output=True, + text=True, + timeout=timeout + ) + return result.returncode, result.stdout, result.stderr + except subprocess.TimeoutExpired: + return -1, "", "Command timed out" + except Exception as e: + return -1, "", str(e) + + def install_deps(self) -> tuple[int, str, str]: + """Install dependencies if a package file exists.""" + if os.path.exists(os.path.join(self.local_repo_path, "package.json")): + return self.run_command(["npm", "install"], timeout=300) + elif os.path.exists(os.path.join(self.local_repo_path, "requirements.txt")): + return self.run_command(["pip", "install", "-r", "requirements.txt"], timeout=300) + elif os.path.exists(os.path.join(self.local_repo_path, "pyproject.toml")): + return self.run_command(["pip", "install", "-e", "."], timeout=300) + return (0, "No dependency file found", "") + + def stage_and_commit(self, files: list[str], message: str) -> tuple[int, str]: + """ + Git add and commit files. Returns (exit_code, output). + """ + # Add files + code, out, err = self.run_command(["git", "add", "--"] + files) + if code != 0: + return code, f"git add failed: {err}" + + # Commit + code, out, err = self.run_command(["git", "commit", "-m", message]) + if code != 0: + return code, f"git commit failed: {err}" + + return 0, out + + def push_branch(self, branch: str) -> tuple[int, str]: + """Push the current branch to origin.""" + code, out, err = self.run_command(["git", "push", "-u", "origin", branch]) + return code, out + err + + def get_staged_files(self) -> list[str]: + """Get list of currently staged files.""" + code, out, err = self.run_command(["git", "diff", "--cached", "--name-only"]) + if code == 0: + return [f.strip() for f in out.split("\n") if f.strip()] + return [] + + def get_changed_files(self) -> list[str]: + """Get list of all changed (staged + unstaged) files.""" + code, out, err = self.run_command(["git", "diff", "--name-only"]) + files = [] + if code == 0: + files.extend([f.strip() for f in out.split("\n") if f.strip()]) + code2, out2, _ = self.run_command(["git", "diff", "--cached", "--name-only"]) + if code2 == 0: + files.extend([f.strip() for f in out2.split("\n") if f.strip() and f.strip() not in files]) + return files diff --git a/automaton/bounty_hunter/github_client.py b/automaton/bounty_hunter/github_client.py new file mode 100644 index 000000000..cc917820c --- /dev/null +++ b/automaton/bounty_hunter/github_client.py @@ -0,0 +1,279 @@ +""" +GitHub client for the Bounty Hunter Agent. +Handles all GitHub API interactions: listing bounties, reading issues, +creating branches, committing code, and submitting PRs. +""" + +import os +import base64 +import re +from dataclasses import dataclass, field +from typing import Optional +from enum import Enum + + +class BountyTier(Enum): + TIER_1 = "tier-1" + TIER_2 = "tier-2" + TIER_3 = "tier-3" + + +@dataclass +class Bounty: + number: int + title: str + body: str + labels: list[str] + tier: Optional[BountyTier] = None + reward: Optional[str] = None + domain: Optional[str] = None + assignee: Optional[str] = None + state: str = "open" + + @classmethod + def from_issue(cls, issue: dict) -> "Bounty": + labels = [l.get("name", "") for l in issue.get("labels", [])] + tier = None + for label in labels: + if "tier-1" in label.lower(): + tier = BountyTier.TIER_1 + elif "tier-2" in label.lower(): + tier = BountyTier.TIER_2 + elif "tier-3" in label.lower(): + tier = BountyTier.TIER_3 + + reward_match = re.search( + r"(?:reward[^\n:]*:\s*)?([\$€£]?\s*\d+(?:,\d{3})*(?:\.\d+)?\s*[MKmk]?\s*(?:USD|FNDRY|USDC|\$)?)", + issue.get("body", ""), + re.IGNORECASE, + ) + reward = reward_match.group(1).strip() if reward_match else None + + domain_labels = ["agent", "frontend", "backend", "integration", "security", "devops"] + domain = next((l for l in labels if l.lower() in domain_labels), None) + + return cls( + number=issue.get("number"), + title=issue.get("title", ""), + body=issue.get("body", ""), + labels=labels, + tier=tier, + reward=reward, + domain=domain, + assignee=issue.get("assignee", {}).get("login") if issue.get("assignee") else None, + state=issue.get("state", "open"), + ) + + +@dataclass +class GitHubClient: + """ + GitHub API client for bounty hunting operations. + Reads token from GITHUB_TOKEN env var. + """ + owner: str = "SolFoundry" + repo: str = "solfoundry" + token: str = field(default_factory=lambda: os.environ.get("GITHUB_TOKEN", "")) + upstream_owner: str = "SolFoundry" + upstream_repo: str = "solfoundry" + + def __post_init__(self): + if not self.token: + raise ValueError("GITHUB_TOKEN environment variable not set") + self._headers = { + "Authorization": f"Bearer {self.token}", + "Accept": "application/vnd.github+json", + "X-GitHub-Api-Version": "2022-11-28", + } + + def _graphql_query(self, query: str, variables: dict = None) -> dict: + """Execute a GraphQL query against the GitHub API.""" + import urllib.request + import json + + payload = {"query": query} + if variables: + payload["variables"] = variables + + req = urllib.request.Request( + "https://api.github.com/graphql", + data=json.dumps(payload).encode(), + headers=self._headers, + method="POST" + ) + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read()) + + def _rest_request(self, method: str, path: str, data: dict = None, params: dict = None) -> dict: + """Execute a REST API request against the GitHub API.""" + import urllib.request + import urllib.parse + import json + + url = f"https://api.github.com/{path}" + if params: + url += "?" + urllib.parse.urlencode(params) + + payload = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=payload, headers=self._headers, method=method) + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read()) + + def list_open_bounties(self, min_tier: str = "tier-1") -> list[Bounty]: + """ + List all open bounty issues from the SolFoundry repo. + Filter by minimum tier (default: tier-1 = all bounties). + """ + all_issues = [] + page = 1 + per_page = 100 + + while True: + issues = self._rest_request( + "GET", + f"repos/{self.upstream_owner}/{self.upstream_repo}/issues", + params={"state": "open", "labels": "bounty", "per_page": per_page, "page": page} + ) + if not issues: + break + all_issues.extend(issues) + if len(issues) < per_page: + break + page += 1 + + bounties = [Bounty.from_issue(i) for i in all_issues] + + # Filter out PRs and assigned bounties + bounties = [b for b in bounties if b.assignee is None] + + return bounties + + def get_issue(self, issue_number: int) -> dict: + """Get a single issue by number.""" + return self._rest_request( + "GET", + f"repos/{self.upstream_owner}/{self.upstream_repo}/issues/{issue_number}" + ) + + def get_file_content(self, path: str, ref: str = "main") -> str: + """Get the content of a file from the repo.""" + data = self._rest_request( + "GET", + f"repos/{self.upstream_owner}/{self.upstream_repo}/contents/{path}", + params={"ref": ref} + ) + return base64.b64decode(data["content"]).decode("utf-8") + + def create_branch(self, branch_name: str, from_ref: str = "main") -> str: + """ + Create a new branch. Returns the branch name. + """ + # Get the SHA of the base ref + ref_data = self._rest_request( + "GET", + f"repos/{self.upstream_owner}/{self.upstream_repo}/git/refs/heads/{from_ref}" + ) + sha = ref_data["object"]["sha"] + + # Create new branch + self._rest_request( + "POST", + f"repos/{self.owner}/{self.repo}/git/refs", + data={ + "ref": f"refs/heads/{branch_name}", + "sha": sha + } + ) + return branch_name + + def update_file(self, path: str, content: str, message: str, branch: str, sha: str = None) -> dict: + """ + Create or update a file in the repo. + If sha is not provided, will try to get it first (for updates). + """ + if sha is None: + try: + existing = self._rest_request( + "GET", + f"repos/{self.owner}/{self.repo}/contents/{path}", + params={"ref": branch} + ) + sha = existing["sha"] + except Exception: + sha = None + + data = { + "message": message, + "content": base64.b64encode(content.encode()).decode(), + "branch": branch, + } + if sha: + data["sha"] = sha + + return self._rest_request( + "PUT", + f"repos/{self.owner}/{self.repo}/contents/{path}", + data=data + ) + + def create_pull_request( + self, + title: str, + body: str, + head_branch: str, + base_branch: str = "main" + ) -> dict: + """ + Create a pull request from the fork to the upstream repo. + """ + return self._rest_request( + "POST", + f"repos/{self.upstream_owner}/{self.upstream_repo}/pulls", + data={ + "title": title, + "body": body, + "head": f"{self.owner}:{head_branch}", + "base": base_branch, + } + ) + + def add_pr_comment(self, pr_number: int, body: str) -> dict: + """Add a comment to a pull request.""" + return self._rest_request( + "POST", + f"repos/{self.upstream_owner}/{self.upstream_repo}/issues/{pr_number}/comments", + data={"body": body} + ) + + def get_pr(self, pr_number: int) -> dict: + """Get a PR by number.""" + return self._rest_request( + "GET", + f"repos/{self.upstream_owner}/{self.upstream_repo}/pulls/{pr_number}" + ) + + def get_user_repos(self) -> list[dict]: + """Get list of repos for the authenticated user.""" + repos = [] + page = 1 + while True: + data = self._rest_request( + "GET", + "user/repos", + params={"per_page": 100, "page": page, "sort": "updated"} + ) + if not data: + break + repos.extend(data) + if len(data) < 100: + break + page += 1 + return repos + + def fork_exists(self) -> bool: + """Check if the fork exists.""" + try: + self._rest_request("GET", f"repos/{self.owner}/{self.repo}") + return True + except Exception: + return False diff --git a/automaton/bounty_hunter/planner.py b/automaton/bounty_hunter/planner.py new file mode 100644 index 000000000..9d7ca1598 --- /dev/null +++ b/automaton/bounty_hunter/planner.py @@ -0,0 +1,237 @@ +""" +Planning module for the Bounty Hunter Agent. +Uses an LLM to analyze bounties and generate implementation plans. +""" + +import os +import json +import urllib.request +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class ImplementationStep: + """A single step in the implementation plan.""" + order: int + description: str + files_to_modify: list[str] + files_to_create: list[str] + notes: str = "" + + +@dataclass +class ImplementationPlan: + """A complete implementation plan for a bounty.""" + bounty_number: int + title: str + summary: str + steps: list[ImplementationStep] + tech_stack: list[str] + estimated_complexity: str # "low", "medium", "high" + testing_approach: str + + +class Planner: + """ + LLM-powered planning module. + Uses GPT-4 or compatible API to generate implementation plans. + """ + + def __init__( + self, + model: str = "gpt-4", + api_key: str = None, + api_base: str = None + ): + self.model = model or os.environ.get("LLM_MODEL", "gpt-4") + self.api_key = api_key or os.environ.get("OPENAI_API_KEY", "") + self.api_base = api_base or os.environ.get("OPENAI_API_BASE", "https://api.openai.com/v1") + + def _call_llm(self, messages: list[dict], temperature: float = 0.3) -> str: + """Make an LLM API call.""" + if not self.api_key: + # Fallback to a simple heuristic planner if no API key + return self._fallback_plan(messages) + + payload = { + "model": self.model, + "messages": messages, + "temperature": temperature, + } + + req = urllib.request.Request( + f"{self.api_base.rstrip('/')}/chat/completions", + data=json.dumps(payload).encode(), + headers={ + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + }, + method="POST" + ) + + with urllib.request.urlopen(req, timeout=60) as resp: + result = json.loads(resp.read()) + return result["choices"][0]["message"]["content"] + + def _fallback_plan(self, messages: list[dict]) -> str: + """ + Fallback planner when no LLM API key is available. + Uses the issue body and title to create a basic plan. + """ + # Extract the last user message (the planning request) + user_msg = messages[-1]["content"] if messages else "" + return json.dumps({ + "summary": "Basic implementation plan generated (no LLM API key)", + "steps": [ + {"order": 1, "description": "Analyze codebase structure", "files_to_modify": [], "files_to_create": []}, + {"order": 2, "description": "Implement core functionality", "files_to_modify": [], "files_to_create": []}, + {"order": 3, "description": "Add tests", "files_to_modify": [], "files_to_create": []}, + ], + "tech_stack": ["python"], + "estimated_complexity": "medium", + "testing_approach": "Unit tests with pytest" + }) + + def create_plan( + self, + bounty_body: str, + bounty_title: str, + bounty_number: int, + codebase_structure: str = None, + relevant_files: dict[str, str] = None + ) -> ImplementationPlan: + """ + Create an implementation plan for a bounty. + + Args: + bounty_body: The full body/description of the bounty issue + bounty_title: The title of the bounty issue + bounty_number: The GitHub issue number + codebase_structure: Optional overview of the codebase + relevant_files: Optional dict of {filepath: content} for context files + """ + + context_parts = [f"## Bounty Issue #{bounty_number}: {bounty_title}\n\n{bounty_body}"] + + if codebase_structure: + context_parts.append(f"\n## Codebase Structure:\n{codebase_structure}") + + if relevant_files: + context_parts.append("\n## Relevant Code Files:") + for path, content in list(relevant_files.items())[:5]: # Limit to 5 files + context_parts.append(f"\n### {path}:\n```\n{content[:2000]}\n```") + + planning_prompt = f""" +You are a senior software engineer planning an implementation for a GitHub bounty issue. + +Analyze the bounty requirements and create a detailed implementation plan. + +{chr(10).join(context_parts)} + +Generate a JSON implementation plan with this exact structure: +{{ + "summary": "Brief summary of what will be built (1-2 sentences)", + "steps": [ + {{ + "order": 1, + "description": "What this step does", + "files_to_modify": ["path/to/file1.py", "path/to/file2.ts"], + "files_to_create": ["path/to/new_file.py"], + "notes": "Any important considerations" + }} + ], + "tech_stack": ["python", "fastapi", "react"], + "estimated_complexity": "low|medium|high", + "testing_approach": "How to test this feature" +}} + +Respond ONLY with valid JSON. No markdown, no explanation. +""" + + messages = [ + {"role": "system", "content": "You are an expert software architect. Create clear, actionable implementation plans in JSON format."}, + {"role": "user", "content": planning_prompt} + ] + + try: + response = self._call_llm(messages) + # Extract JSON from response + json_start = response.find("{") + json_end = response.rfind("}") + 1 + if json_start >= 0 and json_end > json_start: + plan_data = json.loads(response[json_start:json_end]) + else: + plan_data = json.loads(response) + except Exception as e: + # Fallback + plan_data = json.loads(self._fallback_plan(messages)) + + steps = [ + ImplementationStep( + order=s["order"], + description=s["description"], + files_to_modify=s.get("files_to_modify", []), + files_to_create=s.get("files_to_create", []), + notes=s.get("notes", "") + ) + for s in plan_data.get("steps", []) + ] + + return ImplementationPlan( + bounty_number=bounty_number, + title=bounty_title, + summary=plan_data.get("summary", ""), + steps=steps, + tech_stack=plan_data.get("tech_stack", []), + estimated_complexity=plan_data.get("estimated_complexity", "medium"), + testing_approach=plan_data.get("testing_approach", "Unit tests") + ) + + def generate_code( + self, + plan: ImplementationPlan, + codebase_context: str, + step: ImplementationStep + ) -> dict[str, str]: + """ + Generate code for a specific implementation step. + Returns dict of {filepath: code_content}. + """ + prompt = f""" +You are implementing step {step.order} of a bounty hunter agent. + +## Plan Summary +{plan.summary} + +## Current Step +{step.description} + +Files to modify: {step.files_to_modify} +Files to create: {step.files_to_create} +Notes: {step.notes} + +## Codebase Context +{codebase_context[:8000]} + +Generate the code for this step. Output as JSON dict: +{{"filepath1": "full file content", "filepath2": "full file content"}} + +Use proper language syntax. Include docstrings. Write complete, production-ready code. +Respond ONLY with valid JSON. +""" + + messages = [ + {"role": "system", "content": "You are an expert coder. Output ONLY valid JSON mapping file paths to their full code content."}, + {"role": "user", "content": prompt} + ] + + try: + response = self._call_llm(messages, temperature=0.4) + json_start = response.find("{") + json_end = response.rfind("}") + 1 + if json_start >= 0 and json_end > json_start: + return json.loads(response[json_start:json_end]) + return json.loads(response) + except Exception: + return {} diff --git a/automaton/bounty_hunter/test_bounty_hunter.py b/automaton/bounty_hunter/test_bounty_hunter.py new file mode 100644 index 000000000..96a0fd6cb --- /dev/null +++ b/automaton/bounty_hunter/test_bounty_hunter.py @@ -0,0 +1,50 @@ +from automaton.bounty_hunter.github_client import Bounty, BountyTier +from automaton.bounty_hunter.planner import Planner +from automaton.bounty_hunter.tester import Tester + + +def test_bounty_from_issue_parses_tier_and_reward(): + issue = { + "number": 861, + "title": "🏭 Bounty T3: Full Autonomous Bounty-Hunting Agent", + "body": "**Reward:** 1M $FNDRY | **Tier:** T3 | **Domain:** Agent", + "labels": [ + {"name": "bounty"}, + {"name": "tier-3"}, + {"name": "agent"}, + ], + "state": "open", + "assignee": None, + } + + bounty = Bounty.from_issue(issue) + + assert bounty.number == 861 + assert bounty.tier == BountyTier.TIER_3 + assert bounty.domain == "agent" + assert bounty.state == "open" + assert bounty.assignee is None + assert bounty.reward is not None + + +def test_planner_fallback_returns_valid_plan(): + planner = Planner(api_key="") + + plan = planner.create_plan( + bounty_body="Build a bot that scans bounties and opens PRs.", + bounty_title="Autonomous bounty hunter", + bounty_number=861, + codebase_structure="automaton/\n README.md", + ) + + assert plan.bounty_number == 861 + assert plan.title == "Autonomous bounty hunter" + assert plan.summary + assert len(plan.steps) >= 1 + assert plan.estimated_complexity in {"low", "medium", "high"} + + +def test_tester_detects_pytest_for_repo(): + tester = Tester(".") + framework = tester.detect_test_framework() + assert framework == "pytest" diff --git a/automaton/bounty_hunter/tester.py b/automaton/bounty_hunter/tester.py new file mode 100644 index 000000000..f109a733c --- /dev/null +++ b/automaton/bounty_hunter/tester.py @@ -0,0 +1,219 @@ +""" +Tester module for the Bounty Hunter Agent. +Handles test execution and validation. +""" + +import os +import re +import subprocess +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class TestResult: + """Result of a test run.""" + passed: bool + total_tests: int = 0 + passed_tests: int = 0 + failed_tests: int = 0 + errors: int = 0 + output: str = "" + duration: float = 0.0 + + @property + def summary(self) -> str: + return ( + f"passed={self.passed}, total={self.total_tests}, " + f"passed_tests={self.passed_tests}, failed={self.failed_tests}, errors={self.errors}" + ) + + +class Tester: + """ + Test execution module. + Detects test framework and runs appropriate test commands. + """ + + __test__ = False + + def __init__(self, local_repo_path: str): + self.local_repo_path = local_repo_path + + def _run_command(self, cmd: list[str], timeout: int = 180) -> subprocess.CompletedProcess: + """Run a command in the repo directory.""" + return subprocess.run( + cmd, + cwd=self.local_repo_path, + capture_output=True, + text=True, + timeout=timeout, + env={**os.environ, "TERM": "dumb"} + ) + + def detect_test_framework(self) -> Optional[str]: + """ + Detect which test framework the project uses. + Returns: "pytest", "jest", "vitest", "unittest", "mocha", or None + """ + has_pytest = any( + os.path.exists(os.path.join(self.local_repo_path, f)) + for f in ["pytest.ini", "pyproject.toml", "setup.cfg", "conftest.py"] + ) + if has_pytest: + return "pytest" + + has_requirements_txt = os.path.exists(os.path.join(self.local_repo_path, "requirements.txt")) + if has_requirements_txt: + with open(os.path.join(self.local_repo_path, "requirements.txt"), encoding="utf-8") as f: + content = f.read() + if "pytest" in content: + return "pytest" + + if os.path.exists(os.path.join(self.local_repo_path, "package.json")): + with open(os.path.join(self.local_repo_path, "package.json"), encoding="utf-8") as f: + content = f.read() + if '"vitest"' in content: + return "vitest" + if '"jest"' in content: + return "jest" + if '"mocha"' in content: + return "mocha" + + for root, _, files in os.walk(self.local_repo_path): + for file in files: + if file.startswith("test_") and file.endswith(".py"): + path = os.path.join(root, file) + try: + with open(path, encoding="utf-8") as f: + content = f.read() + if "unittest" in content or "pytest" in content or "assert " in content: + return "pytest" + except Exception: + continue + + return None + + def run_tests( + self, + test_path: str = None, + framework: str = None, + verbose: bool = True + ) -> TestResult: + framework = framework or self.detect_test_framework() + + if not framework: + return TestResult( + passed=False, + output="No test framework detected. Cannot run tests." + ) + + cmd = self._build_command(framework, test_path, verbose) + + try: + result = self._run_command(cmd, timeout=300) + return self._parse_result(framework, result, passed=result.returncode == 0) + except subprocess.TimeoutExpired: + return TestResult(passed=False, output="Tests timed out after 5 minutes.") + except Exception as e: + return TestResult(passed=False, output=f"Error running tests: {str(e)}") + + def _build_command(self, framework: str, test_path: str, verbose: bool) -> list[str]: + if framework == "pytest": + base = ["python", "-m", "pytest"] + if verbose: + base.extend(["-v", "--tb=short"]) + if test_path: + base.append(test_path) + else: + base.extend(["-q"]) + return base + + if framework == "jest": + base = ["npx", "jest"] + if verbose: + base.append("--verbose") + if test_path: + base.extend([test_path, "--no-coverage"]) + else: + base.append("--no-coverage") + return base + + if framework == "vitest": + base = ["npx", "vitest", "run"] + if test_path: + base.append(test_path) + return base + + if framework == "mocha": + base = ["npx", "mocha"] + if test_path: + base.extend([test_path, "--reporter", "spec"]) + else: + base.extend(["--reporter", "spec"]) + return base + + if framework == "unittest": + base = ["python", "-m", "unittest"] + if test_path: + base.append(test_path) + else: + base.append("discover") + return base + + return ["python", "-m", "pytest", "-q"] + + def _parse_result(self, framework: str, result: subprocess.CompletedProcess, passed: bool) -> TestResult: + output = result.stdout + "\n" + result.stderr + total = passed_tests = failed = errors = 0 + + if framework == "pytest": + match = re.search(r"(\d+)\s+passed", output) + if match: + passed_tests = int(match.group(1)) + match = re.search(r"(\d+)\s+failed", output) + if match: + failed = int(match.group(1)) + match = re.search(r"(\d+)\s+error", output) + if match: + errors = int(match.group(1)) + total = passed_tests + failed + errors + + elif framework in ("jest", "vitest"): + match = re.search(r"Tests:\s+(\d+)\s+passed", output) + if match: + passed_tests = int(match.group(1)) + match = re.search(r"(\d+)\s+failed", output) + if match: + failed = int(match.group(1)) + total = passed_tests + failed + + return TestResult( + passed=passed and failed == 0 and errors == 0, + total_tests=total, + passed_tests=passed_tests, + failed_tests=failed, + errors=errors, + output=output[-3000:], + ) + + def run_lint(self, linter: str = "ruff") -> TestResult: + coder_path = os.path.join(self.local_repo_path, "automaton", "bounty_hunter") + if not os.path.exists(coder_path): + return TestResult(passed=True, output="No bounty_hunter code to lint") + + if linter == "ruff": + cmd = ["python", "-m", "ruff", "check", coder_path] + elif linter == "flake8": + cmd = ["python", "-m", "flake8", coder_path] + elif linter == "eslint": + cmd = ["npx", "eslint", coder_path] + else: + return TestResult(passed=True, output=f"Unknown linter: {linter}") + + try: + result = self._run_command(cmd, timeout=60) + passed = result.returncode == 0 + return TestResult(passed=passed, output=result.stdout + "\n" + result.stderr) + except Exception as e: + return TestResult(passed=False, output=str(e))