diff --git a/codegen-on-oss/README.md b/codegen-on-oss/README.md index a7700eb77..98dceecfa 100644 --- a/codegen-on-oss/README.md +++ b/codegen-on-oss/README.md @@ -1,337 +1,138 @@ -# Overview +# Codegen on OSS The **Codegen on OSS** package provides a modular pipeline that: -- **Collects repository URLs** from different sources (e.g., CSV files or GitHub searches). - **Parses repositories** using the codegen tool. - **Profiles performance** and logs metrics for each parsing run. - **Logs errors** to help pinpoint parsing failures or performance bottlenecks. +- **Analyzes codebases** with comprehensive static analysis. +- **Compares codebases** to identify differences between repositories or branches. ______________________________________________________________________ -## Package Structure +## Overview The package is composed of several modules: -- `sources` - - - Defines the Repository source classes and settings. Settings are all configurable via environment variables - - - Github Source - - ```python - class GithubSettings(SourceSettings): - language: Literal["python", "typescript"] = "python" - heuristic: Literal[ - "stars", - "forks", - "updated", - # "watchers", - # "contributors", - # "commit_activity", - # "issues", - # "dependency", - ] = "stars" - github_token: str | None = None - ``` - - - The three options available now are the three supported by the Github API. - - Future Work Additional options will require different strategies - - - CSV Source - - - Simply reads repo URLs from CSV - -- `cache` - - - Currently only specifies the cache directory. It is used for caching git repositories pulled by the pipeline `--force-pull` can be used to re-pull from the remote. - - `cli` - - Built with Click, the CLI provides two main commands: - - `run-one`: Parses a single repository specified by URL. - - `run`: Iterates over repositories obtained from a selected source and parses each one. - -- **`metrics`** - - - Provides profiling tools to measure performance during the parse: - - `MetricsProfiler`: A context manager that creates a profiling session. - - `MetricsProfile`: Represents a "span" or a "run" of a specific repository. Records step-by-step metrics (clock duration, CPU time, memory usage) and writes them to a CSV file specified by `--output-path` - -- **`parser`** - - Contains the `CodegenParser` class that orchestrates the parsing process: - - - Clones the repository (or forces a pull if specified). - - Initializes a `Codebase` (from the codegen tool). - - Runs post-initialization validation. - - Integrates with the `MetricsProfiler` to log measurements at key steps. - -______________________________________________________________________ - -## Getting Started - -1. **Configure the Repository Source** + - Command-line interface for the package + - Supports parsing repositories, analyzing codebases, and comparing codebases - Decide whether you want to read from a CSV file or query GitHub: +- `codebase_analyzer` - - For CSV, ensure that your CSV file (default: `input.csv`) exists and contains repository URLs in its first column \[`repo_url`\] and commit hash \[`commit_hash`\] (or empty) in the second column. - - For GitHub, configure your desired settings (e.g., `language`, `heuristic`, and optionally a GitHub token) via environment variables (`GITHUB_` prefix) + - Comprehensive static code analysis for a single codebase + - Analyzes code structure, dependencies, quality, and more -1. **Run the Parser** +- `codebase_comparator` - Use the CLI to start parsing: + - Compares two codebases and identifies differences + - Can compare different repositories or different branches of the same repository - - To parse one repository: +- `analysis_viewer_cli` - ```bash - uv run cgparse run-one --help - ``` + - Interactive command-line interface for codebase analysis + - Provides a user-friendly way to analyze and compare codebases - - To parse multiple repositories from a source: +- `analysis_viewer_web` - ```bash - uv run cgparse run --help - ``` + - Web-based interface for codebase analysis + - Allows users to analyze and compare codebases through a browser -1. **Review Metrics and Logs** +## Usage - After parsing, check the CSV (default: `metrics.csv` ) to review performance measurements per repository. Error logs are written to the specified error output file (default: `errors.log`) +### Parsing Repositories -______________________________________________________________________ - -## Running on Modal +```bash +# Parse repositories from a CSV file +codegen-on-oss parse --source csv -```shell -$ uv run modal run modal_run.py +# Parse repositories from GitHub +codegen-on-oss parse --source github --limit 10 ``` -Codegen runs this parser on modal using the CSV source file `input.csv` tracked in this repository. - -### Modal Configuration - -- **Compute Resources**: Allocates 4 CPUs and 16GB of memory. -- **Secrets & Volumes**: Uses secrets (for bucket credentials) and mounts a volume for caching repositories. -- **Image Setup**: Builds on a Debian slim image with Python 3.12, installs required packages (`uv` and `git` ) -- **Environment Configuration**: Environment variables (e.g., GitHub settings) are injected at runtime. - -The function `parse_repo_on_modal` performs the following steps: +### Analyzing a Codebase -1. **Environment Setup**: Updates environment variables and configures logging using Loguru. -1. **Source Initialization**: Creates a repository source based on the provided type (e.g., GitHub). -1. **Metrics Profiling**: Instantiates `MetricsProfiler` to capture and log performance data. -1. **Repository Parsing**: Iterates over repository URLs and parses each using the `CodegenParser`. -1. **Error Handling**: Logs any exceptions encountered during parsing. -1. **Result Upload**: Uses the `BucketStore` class to upload the configuration, logs, and metrics to an S3 bucket. +```bash +# Analyze a repository by URL +codegen-on-oss analyze --repo-url https://github.com/username/repo -### Bucket Storage +# Analyze a local repository +codegen-on-oss analyze --repo-path /path/to/local/repo -**Bucket (public):** [codegen-oss-parse](https://s3.amazonaws.com/codegen-oss-parse/) - -The results of each run are saved under the version of `codegen` lib that the run installed and the source type it was run with. Within this prefix: - -- Source Settings - - `https://s3.amazonaws.com/codegen-oss-parse/{version}/{source}/config.json` -- Metrics - - `https://s3.amazonaws.com/codegen-oss-parse/{version}/{source}/metrics.csv` -- Logs - - `https://s3.amazonaws.com/codegen-oss-parse/{version}/{source}/output.logs` +# Specify output format and file +codegen-on-oss analyze --repo-url https://github.com/username/repo --output-format html --output-file report.html +``` -______________________________________________________________________ +### Comparing Codebases -### Running it yourself +```bash +# Compare two repositories +codegen-on-oss compare --base-repo-url https://github.com/username/repo1 --compare-repo-url https://github.com/username/repo2 -You can also run `modal_run.py` yourself. It is designed to be run via Modal for cloud-based parsing. It offers additional configuration methods: +# Compare two branches of the same repository +codegen-on-oss compare --base-repo-url https://github.com/username/repo --base-branch main --compare-branch feature-branch -```shell -$ uv run modal run modal_run.py +# Specify output format and file +codegen-on-oss compare --base-repo-url https://github.com/username/repo1 --compare-repo-url https://github.com/username/repo2 --output-format html --output-file comparison.html ``` -- **CSV and Repository Volumes:** - The script defines two Modal volumes: +### Interactive Mode - - `codegen-oss-input-volume`: For uploading and reloading CSV inputs. - - `codegen-oss-repo-volume`: For caching repository data during parsing. - The repository and input volume names are configurable via environment variables (`CODEGEN_MODAL_REPO_VOLUME` and `CODEGEN_MODAL_INPUT_VOLUME`). - -- **Secrets Handling:** - The script loads various credentials via Modal secrets. It first checks for a pre-configured Modal secret (`codegen-oss-bucket-credentials` configurable via environment variable `CODEGEN_MODAL_SECRET_NAME`) and falls back to dynamically created Modal secret from local `.env` or environment variables if not found. +```bash +# Run in interactive mode +codegen-on-oss interactive +``` -- **Entrypoint Parameters:** - The main function supports multiple source types: +### Web Interface - - **csv:** Uploads a CSV file (`--csv-file input.csv`) for batch processing. - - **single:** Parses a single repository specified by its URL (`--single-url "https://github.com/codegen-sh/codegen-sdk.git"`) and an optional commit hash (`--single-commit ...`) - - **github:** Uses GitHub settings, language (`--github-language python`) and heuristic (`--github-heuristic stars`) to query for top repositories. +```bash +# Launch the web interface +codegen-on-oss web -- **Result Storage:** - Upon completion, logs and metrics are automatically uploaded to the S3 bucket specified by the environment variable `BUCKET_NAME` (default: `codegen-oss-parse`). This allows for centralized storage and easy retrieval of run outputs. The AWS Credentials provided in the secret are used for this operation. +# Create a shareable link +codegen-on-oss web --share -______________________________________________________________________ - -## Extensibility +# Don't open the browser automatically +codegen-on-oss web --no-browser +``` -**Adding New Sources:** +## Analysis Categories -You can define additional repository sources by subclassing `RepoSource` and providing a corresponding settings class. Make sure to set the `source_type` and register your new source by following the pattern established in `CSVInputSource` or `GithubSource`. +The codebase analyzer and comparator support the following categories of analysis: -**Improving Testing:** +- **Codebase Structure**: File counts, language distribution, directory structure, etc. +- **Symbol Level**: Function parameters, return types, complexity metrics, etc. +- **Dependency Flow**: Function call relationships, entry point analysis, etc. +- **Code Quality**: Unused functions, repeated code patterns, refactoring opportunities, etc. +- **Visualization**: Module dependencies, symbol dependencies, call hierarchies, etc. +- **Language Specific**: Decorator usage, type hint coverage, etc. +- **Code Metrics**: Cyclomatic complexity, Halstead volume, maintainability index, etc. -The detailed metrics collected can help you understand where parsing failures occur or where performance lags. Use these insights to improve error handling and optimize the codegen parsing logic. +## Installation -**Containerization and Automation:** +```bash +# Install from PyPI +pip install codegen-on-oss -There is a Dockerfile that can be used to create an image capable of running the parse tests. Runtime environment variables can be used to configure the run and output. +# Install from source +git clone https://github.com/username/codegen-on-oss.git +cd codegen-on-oss +pip install -e . +``` -**Input & Configuration** +## Development -Explore a better CLI for providing options to the Modal run. +```bash +# Install development dependencies +pip install -e ".[dev]" -______________________________________________________________________ +# Run tests +pytest -## Example Log Output - -```shell -[codegen-on-oss*] codegen/codegen-on-oss/$ uv run cgparse run --source csv - 21:32:36 INFO Cloning repository https://github.com/JohnSnowLabs/spark-nlp.git - 21:36:57 INFO { - "profile_name": "https://github.com/JohnSnowLabs/spark-nlp.git", - "step": "codebase_init", - "delta_time": 7.186550649999845, - "cumulative_time": 7.186550649999845, - "cpu_time": 180.3553702, - "memory_usage": 567525376, - "memory_delta": 317095936, - "error": null -} - 21:36:58 INFO { - "profile_name": "https://github.com/JohnSnowLabs/spark-nlp.git", - "step": "post_init_validation", - "delta_time": 0.5465090990001045, - "cumulative_time": 7.733059748999949, - "cpu_time": 180.9174761, - "memory_usage": 569249792, - "memory_delta": 1724416, - "error": null -} - 21:36:58 ERROR Repository: https://github.com/JohnSnowLabs/spark-nlp.git -Traceback (most recent call last): - - File "/home/codegen/codegen/codegen-on-oss/.venv/bin/cgparse", line 10, in - sys.exit(cli()) - │ │ └ - │ └ - └ - File "/home/codegen/codegen/codegen-on-oss/.venv/lib/python3.12/site-packages/click/core.py", line 1161, in __call__ - return self.main(*args, **kwargs) - │ │ │ └ {} - │ │ └ () - │ └ - └ - File "/home/codegen/codegen/codegen-on-oss/.venv/lib/python3.12/site-packages/click/core.py", line 1082, in main - rv = self.invoke(ctx) - │ │ └ - │ └ - └ - File "/home/codegen/codegen/codegen-on-oss/.venv/lib/python3.12/site-packages/click/core.py", line 1697, in invoke - return _process_result(sub_ctx.command.invoke(sub_ctx)) - │ │ │ │ └ - │ │ │ └ - │ │ └ - │ └ - └ ._process_result at 0x7f466597fb00> - File "/home/codegen/codegen/codegen-on-oss/.venv/lib/python3.12/site-packages/click/core.py", line 1443, in invoke - return ctx.invoke(self.callback, **ctx.params) - │ │ │ │ │ └ {'source': 'csv', 'output_path': 'metrics.csv', 'error_output_path': 'errors.log', 'cache_dir': PosixPath('/home/.cache... - │ │ │ │ └ - │ │ │ └ - │ │ └ - │ └ - └ - File "/home/codegen/codegen/codegen-on-oss/.venv/lib/python3.12/site-packages/click/core.py", line 788, in invoke - return __callback(*args, **kwargs) - │ └ {'source': 'csv', 'output_path': 'metrics.csv', 'error_output_path': 'errors.log', 'cache_dir': PosixPath('/home/.cache... - └ () - - File "/home/codegen/codegen/codegen-on-oss/codegen_on_oss/cli.py", line 121, in run - parser.parse(repo_url) - │ │ └ 'https://github.com/JohnSnowLabs/spark-nlp.git' - │ └ - └ - - File "/home/codegen/codegen/codegen-on-oss/codegen_on_oss/parser.py", line 52, in parse - with self.metrics_profiler.start_profiler( - │ │ └ - │ └ - └ - - File "/home/.local/share/uv/python/cpython-3.12.6-linux-x86_64-gnu/lib/python3.12/contextlib.py", line 158, in __exit__ - self.gen.throw(value) - │ │ │ └ ParseRunError() - │ │ └ - │ └ - └ - -> File "/home/codegen/codegen/codegen-on-oss/codegen_on_oss/metrics.py", line 41, in start_profiler - yield profile - └ - - File "/home/codegen/codegen/codegen-on-oss/codegen_on_oss/parser.py", line 64, in parse - raise ParseRunError(validation_status) - │ └ - └ - -codegen_on_oss.parser.ParseRunError: LOW_IMPORT_RESOLUTION_RATE - 21:36:58 INFO { - "profile_name": "https://github.com/JohnSnowLabs/spark-nlp.git", - "step": "TOTAL", - "delta_time": 7.740976418000173, - "cumulative_time": 7.740976418000173, - "cpu_time": 180.9221699, - "memory_usage": 569249792, - "memory_delta": 0, - "error": "LOW_IMPORT_RESOLUTION_RATE" -} - 21:36:58 INFO Cloning repository https://github.com/Lightning-AI/lightning.git - 21:37:53 INFO { - "profile_name": "https://github.com/Lightning-AI/lightning.git", - "step": "codebase_init", - "delta_time": 24.256577352999557, - "cumulative_time": 24.256577352999557, - "cpu_time": 211.3604081, - "memory_usage": 1535971328, - "memory_delta": 966184960, - "error": null -} - 21:37:53 INFO { - "profile_name": "https://github.com/Lightning-AI/lightning.git", - "step": "post_init_validation", - "delta_time": 0.137609629000508, - "cumulative_time": 24.394186982000065, - "cpu_time": 211.5082702, - "memory_usage": 1536241664, - "memory_delta": 270336, - "error": null -} - 21:37:53 INFO { - "profile_name": "https://github.com/Lightning-AI/lightning.git", - "step": "TOTAL", - "delta_time": 24.394700584999555, - "cumulative_time": 24.394700584999555, - "cpu_time": 211.5088282, - "memory_usage": 1536241664, - "memory_delta": 0, - "error": null -} +# Run linters +black . +isort . +mypy . +ruff . ``` - -## Example Metrics Output - -| profile_name | step | delta_time | cumulative_time | cpu_time | memory_usage | memory_delta | error | -| ---------------------- | -------------------- | ------------------ | ------------------ | ----------- | ------------ | ------------ | -------------------------- | -| JohnSnowLabs/spark-nlp | codebase_init | 7.186550649999845 | 7.186550649999845 | 180.3553702 | 567525376 | 317095936 | | -| JohnSnowLabs/spark-nlp | post_init_validation | 0.5465090990001045 | 7.733059748999949 | 180.9174761 | 569249792 | 1724416 | | -| JohnSnowLabs/spark-nlp | TOTAL | 7.740976418000173 | 7.740976418000173 | 180.9221699 | 569249792 | 0 | LOW_IMPORT_RESOLUTION_RATE | -| Lightning-AI/lightning | codebase_init | 24.256577352999557 | 24.256577352999557 | 211.3604081 | 1535971328 | 966184960 | | -| Lightning-AI/lightning | post_init_validation | 0.137609629000508 | 24.394186982000065 | 211.5082702 | 1536241664 | 270336 | | -| Lightning-AI/lightning | TOTAL | 24.394700584999555 | 24.394700584999555 | 211.5088282 | 1536241664 | 0 | | diff --git a/codegen-on-oss/codegen_on_oss/analysis/__init__.py b/codegen-on-oss/codegen_on_oss/analysis/__init__.py new file mode 100644 index 000000000..42f44cf9f --- /dev/null +++ b/codegen-on-oss/codegen_on_oss/analysis/__init__.py @@ -0,0 +1,10 @@ +""" +Analysis package for codegen-on-oss. + +This package provides tools for analyzing and comparing codebases. +""" + +from .codebase_analyzer import CodebaseAnalyzer as BaseCodebaseAnalyzer +from .optimized_analyzer import CodebaseAnalyzer + +__all__ = ["CodebaseAnalyzer", "BaseCodebaseAnalyzer"] diff --git a/codegen-on-oss/codegen_on_oss/analysis/codebase_analyzer.py b/codegen-on-oss/codegen_on_oss/analysis/codebase_analyzer.py new file mode 100755 index 000000000..2c3355d18 --- /dev/null +++ b/codegen-on-oss/codegen_on_oss/analysis/codebase_analyzer.py @@ -0,0 +1,1994 @@ +#!/usr/bin/env python3 +""" +Comprehensive Codebase Analyzer + +This module provides a complete static code analysis system using the Codegen SDK. +It analyzes a codebase and provides extensive information about its structure, +dependencies, code quality, and more. +""" + +import os +import sys +import json +import time +import logging +import argparse +import tempfile +import datetime +import re +import math +import networkx as nx +from pathlib import Path +from typing import Dict, List, Set, Tuple, Any, Optional, Union, Callable +from collections import Counter, defaultdict +import matplotlib.pyplot as plt +from rich.console import Console +from rich.table import Table +from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn + +try: + from codegen.sdk.core.codebase import Codebase + from codegen.configs.models.codebase import CodebaseConfig + from codegen.configs.models.secrets import SecretsConfig + from codegen.shared.enums.programming_language import ProgrammingLanguage +except ImportError: + print("Codegen SDK not found. Please install it first.") + sys.exit(1) + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[logging.StreamHandler()] +) +logger = logging.getLogger(__name__) + +# Constants +METRICS_CATEGORIES = { + "codebase_structure": [ + "get_file_count", + "get_files_by_language", + "get_file_size_distribution", + "get_directory_structure", + "get_symbol_count", + "get_symbol_type_distribution", + "get_symbol_hierarchy", + "get_top_level_vs_nested_symbols", + "get_import_dependency_map", + "get_external_vs_internal_dependencies", + "get_circular_imports", + "get_unused_imports", + "get_module_coupling_metrics", + "get_module_cohesion_analysis", + "get_package_structure", + "get_module_dependency_graph", + ], + "symbol_level": [ + "get_function_parameter_analysis", + "get_return_type_analysis", + "get_function_complexity_metrics", + "get_call_site_tracking", + "get_async_function_detection", + "get_function_overload_analysis", + "get_inheritance_hierarchy", + "get_method_analysis", + "get_attribute_analysis", + "get_constructor_analysis", + "get_interface_implementation_verification", + "get_access_modifier_usage", + "get_type_inference", + "get_usage_tracking", + "get_scope_analysis", + "get_constant_vs_mutable_usage", + "get_global_variable_detection", + "get_type_alias_resolution", + "get_generic_type_usage", + "get_type_consistency_checking", + "get_union_intersection_type_analysis", + ], + "dependency_flow": [ + "get_function_call_relationships", + "get_call_hierarchy_visualization", + "get_entry_point_analysis", + "get_dead_code_detection", + "get_variable_usage_tracking", + "get_data_transformation_paths", + "get_input_output_parameter_analysis", + "get_conditional_branch_analysis", + "get_loop_structure_analysis", + "get_exception_handling_paths", + "get_return_statement_analysis", + "get_symbol_reference_tracking", + "get_usage_frequency_metrics", + "get_cross_file_symbol_usage", + ], + "code_quality": [ + "get_unused_functions", + "get_unused_classes", + "get_unused_variables", + "get_unused_imports", + "get_similar_function_detection", + "get_repeated_code_patterns", + "get_refactoring_opportunities", + "get_cyclomatic_complexity", + "get_cognitive_complexity", + "get_nesting_depth_analysis", + "get_function_size_metrics", + "get_naming_convention_consistency", + "get_comment_coverage", + "get_documentation_completeness", + "get_code_formatting_consistency", + ], + "visualization": [ + "get_module_dependency_visualization", + "get_symbol_dependency_visualization", + "get_import_relationship_graphs", + "get_function_call_visualization", + "get_call_hierarchy_trees", + "get_entry_point_flow_diagrams", + "get_class_hierarchy_visualization", + "get_symbol_relationship_diagrams", + "get_package_structure_visualization", + "get_code_complexity_heat_maps", + "get_usage_frequency_visualization", + "get_change_frequency_analysis", + ], + "language_specific": [ + "get_decorator_usage_analysis", + "get_dynamic_attribute_access_detection", + "get_type_hint_coverage", + "get_magic_method_usage", + "get_interface_implementation_verification", + "get_type_definition_completeness", + "get_jsx_tsx_component_analysis", + "get_type_narrowing_pattern_detection", + ], + "code_metrics": [ + "get_monthly_commits", + "calculate_cyclomatic_complexity", + "cc_rank", + "get_operators_and_operands", + "calculate_halstead_volume", + "count_lines", + "calculate_maintainability_index", + "get_maintainability_rank", + ] +} + +class CodebaseAnalyzer: + """ + Comprehensive codebase analyzer using Codegen SDK. + + This class provides methods to analyze a codebase and extract detailed information + about its structure, dependencies, code quality, and more. + """ + + def __init__(self, repo_url: str = None, repo_path: str = None, language: str = None): + """ + Initialize the CodebaseAnalyzer. + + Args: + repo_url: URL of the repository to analyze + repo_path: Local path to the repository to analyze + language: Programming language of the codebase (auto-detected if not provided) + """ + self.repo_url = repo_url + self.repo_path = repo_path + self.language = language + self.codebase = None + self.console = Console() + self.results = {} + + # Initialize the codebase + if repo_url: + self._init_from_url(repo_url, language) + elif repo_path: + self._init_from_path(repo_path, language) + + def _init_from_url(self, repo_url: str, language: str = None): + """Initialize codebase from a repository URL.""" + try: + # Extract owner and repo name from URL + if repo_url.endswith('.git'): + repo_url = repo_url[:-4] + + parts = repo_url.rstrip('/').split('/') + repo_name = parts[-1] + owner = parts[-2] + repo_full_name = f"{owner}/{repo_name}" + + # Create a temporary directory for cloning + tmp_dir = tempfile.mkdtemp(prefix="codebase_analyzer_") + + # Configure the codebase + config = CodebaseConfig( + debug=False, + allow_external=True, + py_resolve_syspath=True, + ) + + secrets = SecretsConfig() + + # Initialize the codebase + self.console.print(f"[bold green]Initializing codebase from {repo_url}...[/bold green]") + + prog_lang = None + if language: + prog_lang = ProgrammingLanguage(language.upper()) + + self.codebase = Codebase.from_github( + repo_full_name=repo_full_name, + tmp_dir=tmp_dir, + language=prog_lang, + config=config, + secrets=secrets, + full_history=True + ) + + self.console.print(f"[bold green]Successfully initialized codebase from {repo_url}[/bold green]") + + except Exception as e: + self.console.print(f"[bold red]Error initializing codebase from URL: {e}[/bold red]") + raise + + def _init_from_path(self, repo_path: str, language: str = None): + """Initialize codebase from a local repository path.""" + try: + # Configure the codebase + config = CodebaseConfig( + debug=False, + allow_external=True, + py_resolve_syspath=True, + ) + + secrets = SecretsConfig() + + # Initialize the codebase + self.console.print(f"[bold green]Initializing codebase from {repo_path}...[/bold green]") + + prog_lang = None + if language: + prog_lang = ProgrammingLanguage(language.upper()) + + self.codebase = Codebase( + repo_path=repo_path, + language=prog_lang, + config=config, + secrets=secrets + ) + + self.console.print(f"[bold green]Successfully initialized codebase from {repo_path}[/bold green]") + + except Exception as e: + self.console.print(f"[bold red]Error initializing codebase from path: {e}[/bold red]") + raise + + def analyze(self, categories: List[str] = None, output_format: str = "json", output_file: str = None): + """ + Perform a comprehensive analysis of the codebase. + + Args: + categories: List of categories to analyze. If None, all categories are analyzed. + output_format: Format of the output (json, html, console) + output_file: Path to the output file + + Returns: + Dict containing the analysis results + """ + if not self.codebase: + raise ValueError("Codebase not initialized. Please initialize the codebase first.") + + # If no categories specified, analyze all + if not categories: + categories = list(METRICS_CATEGORIES.keys()) + + # Initialize results dictionary + self.results = { + "metadata": { + "repo_name": self.codebase.ctx.repo_name, + "analysis_time": datetime.datetime.now().isoformat(), + "language": str(self.codebase.ctx.programming_language), + }, + "categories": {} + } + + # Analyze each category + with Progress( + SpinnerColumn(), + TextColumn("[bold blue]{task.description}"), + BarColumn(), + TextColumn("[bold green]{task.completed}/{task.total}"), + TimeElapsedColumn(), + ) as progress: + task = progress.add_task("[bold green]Analyzing codebase...", total=len(categories)) + + for category in categories: + if category not in METRICS_CATEGORIES: + self.console.print(f"[bold yellow]Warning: Unknown category '{category}'. Skipping.[/bold yellow]") + progress.update(task, advance=1) + continue + + self.console.print(f"[bold blue]Analyzing {category}...[/bold blue]") + + # Get the metrics for this category + metrics = METRICS_CATEGORIES[category] + category_results = {} + + # Run each metric + for metric in metrics: + try: + method = getattr(self, metric, None) + if method and callable(method): + result = method() + category_results[metric] = result + else: + category_results[metric] = {"error": f"Method {metric} not implemented"} + except Exception as e: + category_results[metric] = {"error": str(e)} + + # Add the results to the main results dictionary + self.results["categories"][category] = category_results + + progress.update(task, advance=1) + + # Output the results + if output_format == "json": + if output_file: + with open(output_file, 'w') as f: + json.dump(self.results, f, indent=2) + self.console.print(f"[bold green]Results saved to {output_file}[/bold green]") + else: + return self.results + elif output_format == "html": + self._generate_html_report(output_file) + elif output_format == "console": + self._print_console_report() + + return self.results + + # + # Codebase Structure Analysis Methods + # + + def get_file_count(self) -> Dict[str, int]: + """Get the total number of files in the codebase.""" + files = list(self.codebase.files) + return { + "total_files": len(files), + "source_files": len([f for f in files if not f.is_binary]) + } + + def get_files_by_language(self) -> Dict[str, int]: + """Get the distribution of files by language/extension.""" + files = list(self.codebase.files) + extensions = {} + + for file in files: + if file.is_binary: + continue + + ext = file.extension + if not ext: + ext = "(no extension)" + + if ext in extensions: + extensions[ext] += 1 + else: + extensions[ext] = 1 + + return extensions + + def get_file_size_distribution(self) -> Dict[str, int]: + """Get the distribution of file sizes.""" + files = list(self.codebase.files) + size_ranges = { + "small (< 1KB)": 0, + "medium (1KB - 10KB)": 0, + "large (10KB - 100KB)": 0, + "very large (> 100KB)": 0 + } + + for file in files: + if file.is_binary: + continue + + size = len(file.content) + + if size < 1024: + size_ranges["small (< 1KB)"] += 1 + elif size < 10240: + size_ranges["medium (1KB - 10KB)"] += 1 + elif size < 102400: + size_ranges["large (10KB - 100KB)"] += 1 + else: + size_ranges["very large (> 100KB)"] += 1 + + return size_ranges + + def get_directory_structure(self) -> Dict[str, Any]: + """Get the directory structure of the codebase.""" + directories = {} + + for directory in self.codebase.directories: + path = str(directory.path) + parent_path = str(directory.path.parent) if directory.path.parent != self.codebase.repo_path else "/" + + if parent_path not in directories: + directories[parent_path] = [] + + directories[parent_path].append({ + "name": directory.path.name, + "path": path, + "files": len(directory.files), + "subdirectories": len(directory.subdirectories) + }) + + return directories + + def get_symbol_count(self) -> Dict[str, int]: + """Get the total count of symbols in the codebase.""" + return { + "total_symbols": len(list(self.codebase.symbols)), + "classes": len(list(self.codebase.classes)), + "functions": len(list(self.codebase.functions)), + "global_vars": len(list(self.codebase.global_vars)), + "interfaces": len(list(self.codebase.interfaces)) + } + + def get_symbol_type_distribution(self) -> Dict[str, int]: + """Get the distribution of symbol types.""" + symbols = list(self.codebase.symbols) + distribution = {} + + for symbol in symbols: + symbol_type = str(symbol.symbol_type) + + if symbol_type in distribution: + distribution[symbol_type] += 1 + else: + distribution[symbol_type] = 1 + + return distribution + + def get_symbol_hierarchy(self) -> Dict[str, Any]: + """Get the hierarchy of symbols in the codebase.""" + classes = list(self.codebase.classes) + hierarchy = {} + + for cls in classes: + class_name = cls.name + parent_classes = [] + + # Get parent classes if available + if hasattr(cls, "parent_class_names"): + parent_classes = cls.parent_class_names + + hierarchy[class_name] = { + "parent_classes": parent_classes, + "methods": [method.name for method in cls.methods], + "attributes": [attr.name for attr in cls.attributes] if hasattr(cls, "attributes") else [] + } + + return hierarchy + + def get_top_level_vs_nested_symbols(self) -> Dict[str, int]: + """Get the count of top-level vs nested symbols.""" + symbols = list(self.codebase.symbols) + top_level = 0 + nested = 0 + + for symbol in symbols: + if hasattr(symbol, "is_top_level") and symbol.is_top_level: + top_level += 1 + else: + nested += 1 + + return { + "top_level": top_level, + "nested": nested + } + + def get_import_dependency_map(self) -> Dict[str, List[str]]: + """Get a map of import dependencies.""" + files = list(self.codebase.files) + dependency_map = {} + + for file in files: + if file.is_binary: + continue + + file_path = file.file_path + imports = [] + + for imp in file.imports: + if hasattr(imp, "imported_symbol") and imp.imported_symbol: + imported_symbol = imp.imported_symbol + if hasattr(imported_symbol, "file") and imported_symbol.file: + imports.append(imported_symbol.file.file_path) + + dependency_map[file_path] = imports + + return dependency_map + + def get_external_vs_internal_dependencies(self) -> Dict[str, int]: + """Get the count of external vs internal dependencies.""" + files = list(self.codebase.files) + internal = 0 + external = 0 + + for file in files: + if file.is_binary: + continue + + for imp in file.imports: + if hasattr(imp, "imported_symbol") and imp.imported_symbol: + imported_symbol = imp.imported_symbol + if hasattr(imported_symbol, "file") and imported_symbol.file: + internal += 1 + else: + external += 1 + else: + external += 1 + + return { + "internal": internal, + "external": external + } + + def get_circular_imports(self) -> List[List[str]]: + """Detect circular imports in the codebase.""" + files = list(self.codebase.files) + dependency_map = {} + + # Build dependency graph + for file in files: + if file.is_binary: + continue + + file_path = file.file_path + imports = [] + + for imp in file.imports: + if hasattr(imp, "imported_symbol") and imp.imported_symbol: + imported_symbol = imp.imported_symbol + if hasattr(imported_symbol, "file") and imported_symbol.file: + imports.append(imported_symbol.file.file_path) + + dependency_map[file_path] = imports + + # Create a directed graph + G = nx.DiGraph() + + # Add nodes and edges + for file_path, imports in dependency_map.items(): + G.add_node(file_path) + for imp in imports: + G.add_edge(file_path, imp) + + # Find cycles + cycles = list(nx.simple_cycles(G)) + + return cycles + + def get_unused_imports(self) -> List[Dict[str, str]]: + """Get a list of unused imports.""" + files = list(self.codebase.files) + unused_imports = [] + + for file in files: + if file.is_binary: + continue + + for imp in file.imports: + if hasattr(imp, "usages") and len(imp.usages) == 0: + unused_imports.append({ + "file": file.file_path, + "import": imp.source + }) + + return unused_imports + + def get_module_coupling_metrics(self) -> Dict[str, float]: + """Calculate module coupling metrics.""" + files = list(self.codebase.files) + dependency_map = {} + + # Build dependency graph + for file in files: + if file.is_binary: + continue + + file_path = file.file_path + imports = [] + + for imp in file.imports: + if hasattr(imp, "imported_symbol") and imp.imported_symbol: + imported_symbol = imp.imported_symbol + if hasattr(imported_symbol, "file") and imported_symbol.file: + imports.append(imported_symbol.file.file_path) + + dependency_map[file_path] = imports + + # Calculate metrics + total_files = len(dependency_map) + total_dependencies = sum(len(deps) for deps in dependency_map.values()) + + if total_files == 0: + return { + "average_dependencies_per_file": 0, + "max_dependencies": 0, + "coupling_factor": 0 + } + + max_dependencies = max(len(deps) for deps in dependency_map.values()) if dependency_map else 0 + coupling_factor = total_dependencies / (total_files * (total_files - 1)) if total_files > 1 else 0 + + return { + "average_dependencies_per_file": total_dependencies / total_files, + "max_dependencies": max_dependencies, + "coupling_factor": coupling_factor + } + + def get_module_cohesion_analysis(self) -> Dict[str, float]: + """Analyze module cohesion.""" + files = list(self.codebase.files) + cohesion_metrics = {} + + for file in files: + if file.is_binary: + continue + + symbols = list(file.symbols) + total_symbols = len(symbols) + + if total_symbols <= 1: + continue + + # Count internal references + internal_refs = 0 + + for symbol in symbols: + if hasattr(symbol, "symbol_usages"): + for usage in symbol.symbol_usages: + if hasattr(usage, "file") and usage.file == file: + internal_refs += 1 + + max_possible_refs = total_symbols * (total_symbols - 1) + cohesion = internal_refs / max_possible_refs if max_possible_refs > 0 else 0 + + cohesion_metrics[file.file_path] = cohesion + + # Calculate average cohesion + if cohesion_metrics: + avg_cohesion = sum(cohesion_metrics.values()) / len(cohesion_metrics) + else: + avg_cohesion = 0 + + return { + "average_cohesion": avg_cohesion, + "file_cohesion": cohesion_metrics + } + + def get_package_structure(self) -> Dict[str, Any]: + """Get the package structure of the codebase.""" + directories = {} + + for directory in self.codebase.directories: + path = str(directory.path) + parent_path = str(directory.path.parent) if directory.path.parent != self.codebase.repo_path else "/" + + if parent_path not in directories: + directories[parent_path] = [] + + # Check if this is a package (has __init__.py) + is_package = any(f.name == "__init__.py" for f in directory.files) + + directories[parent_path].append({ + "name": directory.path.name, + "path": path, + "is_package": is_package, + "files": len(directory.files), + "subdirectories": len(directory.subdirectories) + }) + + return directories + + def get_module_dependency_graph(self) -> Dict[str, List[str]]: + """Get the module dependency graph.""" + files = list(self.codebase.files) + dependency_graph = {} + + for file in files: + if file.is_binary: + continue + + file_path = file.file_path + imports = [] + + for imp in file.imports: + if hasattr(imp, "imported_symbol") and imp.imported_symbol: + imported_symbol = imp.imported_symbol + if hasattr(imported_symbol, "file") and imported_symbol.file: + imports.append(imported_symbol.file.file_path) + + dependency_graph[file_path] = imports + + return dependency_graph + + # + # Symbol-Level Analysis Methods + # + + def get_function_parameter_analysis(self) -> Dict[str, Any]: + """Analyze function parameters.""" + functions = list(self.codebase.functions) + parameter_stats = { + "total_parameters": 0, + "avg_parameters_per_function": 0, + "functions_with_no_parameters": 0, + "functions_with_many_parameters": 0, # > 5 parameters + "parameter_type_coverage": 0, + "functions_with_default_params": 0 + } + + if not functions: + return parameter_stats + + total_params = 0 + functions_with_types = 0 + functions_with_defaults = 0 + + for func in functions: + params = func.parameters + param_count = len(params) + total_params += param_count + + if param_count == 0: + parameter_stats["functions_with_no_parameters"] += 1 + elif param_count > 5: + parameter_stats["functions_with_many_parameters"] += 1 + + # Check for type annotations + has_type_annotations = all(hasattr(p, "type") and p.type for p in params) + if has_type_annotations: + functions_with_types += 1 + + # Check for default values + has_defaults = any(hasattr(p, "default") and p.default for p in params) + if has_defaults: + functions_with_defaults += 1 + + parameter_stats["total_parameters"] = total_params + parameter_stats["avg_parameters_per_function"] = total_params / len(functions) + parameter_stats["parameter_type_coverage"] = functions_with_types / len(functions) if functions else 0 + parameter_stats["functions_with_default_params"] = functions_with_defaults + + return parameter_stats + + def get_return_type_analysis(self) -> Dict[str, Any]: + """Analyze function return types.""" + functions = list(self.codebase.functions) + return_type_stats = { + "functions_with_return_type": 0, + "return_type_coverage": 0, + "common_return_types": {} + } + + if not functions: + return return_type_stats + + functions_with_return_type = 0 + return_types = {} + + for func in functions: + if hasattr(func, "return_type") and func.return_type: + functions_with_return_type += 1 + + return_type = str(func.return_type.source) if hasattr(func.return_type, "source") else str(func.return_type) + + if return_type in return_types: + return_types[return_type] += 1 + else: + return_types[return_type] = 1 + + return_type_stats["functions_with_return_type"] = functions_with_return_type + return_type_stats["return_type_coverage"] = functions_with_return_type / len(functions) + + # Get the most common return types + sorted_types = sorted(return_types.items(), key=lambda x: x[1], reverse=True) + return_type_stats["common_return_types"] = dict(sorted_types[:10]) # Top 10 return types + + return return_type_stats + + def get_function_complexity_metrics(self) -> Dict[str, Any]: + """Calculate function complexity metrics.""" + functions = list(self.codebase.functions) + complexity_metrics = { + "avg_function_length": 0, + "max_function_length": 0, + "functions_by_complexity": { + "simple": 0, # < 10 lines + "moderate": 0, # 10-30 lines + "complex": 0, # 30-100 lines + "very_complex": 0 # > 100 lines + } + } + + if not functions: + return complexity_metrics + + total_length = 0 + max_length = 0 + + for func in functions: + # Calculate function length in lines + func_source = func.source + func_lines = func_source.count('\n') + 1 + + total_length += func_lines + max_length = max(max_length, func_lines) + + # Categorize by complexity + if func_lines < 10: + complexity_metrics["functions_by_complexity"]["simple"] += 1 + elif func_lines < 30: + complexity_metrics["functions_by_complexity"]["moderate"] += 1 + elif func_lines < 100: + complexity_metrics["functions_by_complexity"]["complex"] += 1 + else: + complexity_metrics["functions_by_complexity"]["very_complex"] += 1 + + complexity_metrics["avg_function_length"] = total_length / len(functions) + complexity_metrics["max_function_length"] = max_length + + return complexity_metrics + + def get_call_site_tracking(self) -> Dict[str, Any]: + """Track function call sites.""" + functions = list(self.codebase.functions) + call_site_stats = { + "functions_with_no_calls": 0, + "functions_with_many_calls": 0, # > 10 calls + "avg_call_sites_per_function": 0, + "most_called_functions": [] + } + + if not functions: + return call_site_stats + + function_calls = {} + total_calls = 0 + + for func in functions: + if hasattr(func, "call_sites"): + call_count = len(func.call_sites) + total_calls += call_count + + if call_count == 0: + call_site_stats["functions_with_no_calls"] += 1 + elif call_count > 10: + call_site_stats["functions_with_many_calls"] += 1 + + function_calls[func.name] = call_count + + call_site_stats["avg_call_sites_per_function"] = total_calls / len(functions) + + # Get the most called functions + sorted_functions = sorted(function_calls.items(), key=lambda x: x[1], reverse=True) + call_site_stats["most_called_functions"] = [{"name": name, "calls": calls} for name, calls in sorted_functions[:10]] + + return call_site_stats + + def get_async_function_detection(self) -> Dict[str, Any]: + """Detect async functions.""" + functions = list(self.codebase.functions) + async_stats = { + "total_async_functions": 0, + "async_function_percentage": 0, + "async_functions": [] + } + + if not functions: + return async_stats + + async_functions = [] + + for func in functions: + if hasattr(func, "is_async") and func.is_async: + async_functions.append({ + "name": func.name, + "file": func.file.file_path if hasattr(func, "file") else "Unknown" + }) + + async_stats["total_async_functions"] = len(async_functions) + async_stats["async_function_percentage"] = len(async_functions) / len(functions) + async_stats["async_functions"] = async_functions + + return async_stats + + def get_function_overload_analysis(self) -> Dict[str, Any]: + """Analyze function overloads.""" + functions = list(self.codebase.functions) + overload_stats = { + "total_overloaded_functions": 0, + "overloaded_function_percentage": 0, + "overloaded_functions": [] + } + + if not functions: + return overload_stats + + overloaded_functions = [] + function_names = {} + + for func in functions: + name = func.name + + if name in function_names: + function_names[name].append(func) + else: + function_names[name] = [func] + + for name, funcs in function_names.items(): + if len(funcs) > 1: + overloaded_functions.append({ + "name": name, + "overloads": len(funcs), + "file": funcs[0].file.file_path if hasattr(funcs[0], "file") else "Unknown" + }) + + overload_stats["total_overloaded_functions"] = len(overloaded_functions) + overload_stats["overloaded_function_percentage"] = len(overloaded_functions) / len(function_names) if function_names else 0 + overload_stats["overloaded_functions"] = overloaded_functions + + return overload_stats + + def get_inheritance_hierarchy(self) -> Dict[str, Any]: + """Get the inheritance hierarchy of classes.""" + classes = list(self.codebase.classes) + hierarchy = {} + + for cls in classes: + class_name = cls.name + parent_classes = [] + + # Get parent classes if available + if hasattr(cls, "parent_class_names"): + parent_classes = cls.parent_class_names + + hierarchy[class_name] = { + "parent_classes": parent_classes, + "file": cls.file.file_path if hasattr(cls, "file") else "Unknown" + } + + # Build inheritance tree + inheritance_tree = {} + + for class_name, info in hierarchy.items(): + if not info["parent_classes"]: + if class_name not in inheritance_tree: + inheritance_tree[class_name] = [] + else: + for parent in info["parent_classes"]: + if parent not in inheritance_tree: + inheritance_tree[parent] = [] + inheritance_tree[parent].append(class_name) + + return { + "class_hierarchy": hierarchy, + "inheritance_tree": inheritance_tree + } + + def get_method_analysis(self) -> Dict[str, Any]: + """Analyze class methods.""" + classes = list(self.codebase.classes) + method_stats = { + "total_methods": 0, + "avg_methods_per_class": 0, + "classes_with_no_methods": 0, + "classes_with_many_methods": 0, # > 10 methods + "method_types": { + "instance": 0, + "static": 0, + "class": 0, + "property": 0 + } + } + + if not classes: + return method_stats + + total_methods = 0 + + for cls in classes: + methods = cls.methods if hasattr(cls, "methods") else [] + method_count = len(methods) + total_methods += method_count + + if method_count == 0: + method_stats["classes_with_no_methods"] += 1 + elif method_count > 10: + method_stats["classes_with_many_methods"] += 1 + + # Analyze method types + for method in methods: + if hasattr(method, "is_static") and method.is_static: + method_stats["method_types"]["static"] += 1 + elif hasattr(method, "is_class_method") and method.is_class_method: + method_stats["method_types"]["class"] += 1 + elif hasattr(method, "is_property") and method.is_property: + method_stats["method_types"]["property"] += 1 + else: + method_stats["method_types"]["instance"] += 1 + + method_stats["total_methods"] = total_methods + method_stats["avg_methods_per_class"] = total_methods / len(classes) if classes else 0 + + return method_stats + + def get_attribute_analysis(self) -> Dict[str, Any]: + """Analyze class attributes.""" + classes = list(self.codebase.classes) + attribute_stats = { + "total_attributes": 0, + "avg_attributes_per_class": 0, + "classes_with_no_attributes": 0, + "classes_with_many_attributes": 0, # > 10 attributes + "attribute_types": {} + } + + if not classes: + return attribute_stats + + total_attributes = 0 + attribute_types = {} + + for cls in classes: + attributes = cls.attributes if hasattr(cls, "attributes") else [] + attr_count = len(attributes) + total_attributes += attr_count + + if attr_count == 0: + attribute_stats["classes_with_no_attributes"] += 1 + elif attr_count > 10: + attribute_stats["classes_with_many_attributes"] += 1 + + # Analyze attribute types + for attr in attributes: + if hasattr(attr, "type") and attr.type: + attr_type = str(attr.type.source) if hasattr(attr.type, "source") else str(attr.type) + + if attr_type in attribute_types: + attribute_types[attr_type] += 1 + else: + attribute_types[attr_type] = 1 + + attribute_stats["total_attributes"] = total_attributes + attribute_stats["avg_attributes_per_class"] = total_attributes / len(classes) if classes else 0 + attribute_stats["attribute_types"] = attribute_types + + return attribute_stats + + def get_constructor_analysis(self) -> Dict[str, Any]: + """Analyze class constructors.""" + classes = list(self.codebase.classes) + constructor_stats = { + "classes_with_constructor": 0, + "constructor_percentage": 0, + "avg_constructor_params": 0 + } + + if not classes: + return constructor_stats + + classes_with_constructor = 0 + total_constructor_params = 0 + + for cls in classes: + constructor = None + + # Find constructor + for method in cls.methods: + if hasattr(method, "is_constructor") and method.is_constructor: + constructor = method + break + + if constructor: + classes_with_constructor += 1 + param_count = len(constructor.parameters) if hasattr(constructor, "parameters") else 0 + total_constructor_params += param_count + + constructor_stats["classes_with_constructor"] = classes_with_constructor + constructor_stats["constructor_percentage"] = classes_with_constructor / len(classes) + constructor_stats["avg_constructor_params"] = total_constructor_params / classes_with_constructor if classes_with_constructor else 0 + + return constructor_stats + + def get_interface_implementation_verification(self) -> Dict[str, Any]: + """Verify interface implementations.""" + classes = list(self.codebase.classes) + interfaces = list(self.codebase.interfaces) + implementation_stats = { + "total_interfaces": len(interfaces), + "classes_implementing_interfaces": 0, + "interface_implementations": {} + } + + if not interfaces or not classes: + return implementation_stats + + # Map interfaces to implementing classes + interface_implementations = {} + + for interface in interfaces: + interface_name = interface.name + implementing_classes = [] + + for cls in classes: + if hasattr(cls, "parent_class_names") and interface_name in cls.parent_class_names: + implementing_classes.append(cls.name) + + interface_implementations[interface_name] = implementing_classes + + # Count classes implementing interfaces + classes_implementing = set() + for implementers in interface_implementations.values(): + classes_implementing.update(implementers) + + implementation_stats["classes_implementing_interfaces"] = len(classes_implementing) + implementation_stats["interface_implementations"] = interface_implementations + + return implementation_stats + + def get_access_modifier_usage(self) -> Dict[str, Any]: + """Analyze access modifier usage.""" + symbols = list(self.codebase.symbols) + access_stats = { + "public": 0, + "private": 0, + "protected": 0, + "internal": 0, + "unknown": 0 + } + + for symbol in symbols: + if hasattr(symbol, "is_private") and symbol.is_private: + access_stats["private"] += 1 + elif hasattr(symbol, "is_protected") and symbol.is_protected: + access_stats["protected"] += 1 + elif hasattr(symbol, "is_internal") and symbol.is_internal: + access_stats["internal"] += 1 + elif hasattr(symbol, "is_public") and symbol.is_public: + access_stats["public"] += 1 + else: + access_stats["unknown"] += 1 + + return access_stats + + # + # Code Quality Analysis Methods + # + + def get_unused_functions(self) -> List[Dict[str, str]]: + """Get a list of unused functions.""" + functions = list(self.codebase.functions) + unused_functions = [] + + for func in functions: + if hasattr(func, "call_sites") and len(func.call_sites) == 0: + # Skip special methods like __init__, __str__, etc. + if hasattr(func, "is_magic") and func.is_magic: + continue + + # Skip entry points and main functions + if func.name in ["main", "__main__"]: + continue + + unused_functions.append({ + "name": func.name, + "file": func.file.file_path if hasattr(func, "file") else "Unknown" + }) + + return unused_functions + + def get_unused_classes(self) -> List[Dict[str, str]]: + """Get a list of unused classes.""" + classes = list(self.codebase.classes) + unused_classes = [] + + for cls in classes: + if hasattr(cls, "symbol_usages") and len(cls.symbol_usages) == 0: + unused_classes.append({ + "name": cls.name, + "file": cls.file.file_path if hasattr(cls, "file") else "Unknown" + }) + + return unused_classes + + def get_unused_variables(self) -> List[Dict[str, str]]: + """Get a list of unused variables.""" + global_vars = list(self.codebase.global_vars) + unused_vars = [] + + for var in global_vars: + if hasattr(var, "symbol_usages") and len(var.symbol_usages) == 0: + unused_vars.append({ + "name": var.name, + "file": var.file.file_path if hasattr(var, "file") else "Unknown" + }) + + return unused_vars + + def get_unused_imports(self) -> List[Dict[str, str]]: + """Get a list of unused imports.""" + files = list(self.codebase.files) + unused_imports = [] + + for file in files: + if file.is_binary: + continue + + for imp in file.imports: + if hasattr(imp, "usages") and len(imp.usages) == 0: + unused_imports.append({ + "file": file.file_path, + "import": imp.source + }) + + return unused_imports + + def get_similar_function_detection(self) -> List[Dict[str, Any]]: + """Detect similar functions.""" + functions = list(self.codebase.functions) + similar_functions = [] + + # Group functions by name + function_groups = {} + + for func in functions: + name = func.name + + if name in function_groups: + function_groups[name].append(func) + else: + function_groups[name] = [func] + + # Find similar functions + for name, funcs in function_groups.items(): + if len(funcs) > 1: + similar_functions.append({ + "name": name, + "count": len(funcs), + "files": [func.file.file_path if hasattr(func, "file") else "Unknown" for func in funcs] + }) + + return similar_functions + + def get_repeated_code_patterns(self) -> Dict[str, Any]: + """Detect repeated code patterns.""" + functions = list(self.codebase.functions) + + # This is a simplified implementation that looks for functions with similar structure + # A more advanced implementation would use code clone detection algorithms + + # Group functions by length (in lines) + functions_by_length = {} + + for func in functions: + func_source = func.source + func_lines = func_source.count('\n') + 1 + + if func_lines in functions_by_length: + functions_by_length[func_lines].append(func) + else: + functions_by_length[func_lines] = [func] + + # Find potential code clones (functions with same length) + potential_clones = {} + + for length, funcs in functions_by_length.items(): + if len(funcs) > 1: + potential_clones[length] = [func.name for func in funcs] + + return { + "potential_code_clones": potential_clones + } + + def get_refactoring_opportunities(self) -> Dict[str, Any]: + """Identify refactoring opportunities.""" + refactoring_opportunities = { + "long_functions": [], + "large_classes": [], + "high_coupling_files": [], + "low_cohesion_files": [] + } + + # Find long functions + functions = list(self.codebase.functions) + for func in functions: + func_source = func.source + func_lines = func_source.count('\n') + 1 + + if func_lines > 50: # Threshold for long functions + refactoring_opportunities["long_functions"].append({ + "name": func.name, + "file": func.file.file_path if hasattr(func, "file") else "Unknown", + "lines": func_lines + }) + + # Find large classes + classes = list(self.codebase.classes) + for cls in classes: + methods = cls.methods if hasattr(cls, "methods") else [] + attributes = cls.attributes if hasattr(cls, "attributes") else [] + + if len(methods) + len(attributes) > 20: # Threshold for large classes + refactoring_opportunities["large_classes"].append({ + "name": cls.name, + "file": cls.file.file_path if hasattr(cls, "file") else "Unknown", + "methods": len(methods), + "attributes": len(attributes) + }) + + # Find high coupling files + files = list(self.codebase.files) + for file in files: + if file.is_binary: + continue + + imports = file.imports + if len(imports) > 15: # Threshold for high coupling + refactoring_opportunities["high_coupling_files"].append({ + "file": file.file_path, + "imports": len(imports) + }) + + # Find low cohesion files + cohesion_metrics = self.get_module_cohesion_analysis() + file_cohesion = cohesion_metrics.get("file_cohesion", {}) + + for file_path, cohesion in file_cohesion.items(): + if cohesion < 0.3: # Threshold for low cohesion + refactoring_opportunities["low_cohesion_files"].append({ + "file": file_path, + "cohesion": cohesion + }) + + return refactoring_opportunities + + def calculate_cyclomatic_complexity(self) -> Dict[str, Any]: + """Calculate cyclomatic complexity for functions.""" + functions = list(self.codebase.functions) + complexity_results = { + "avg_complexity": 0, + "max_complexity": 0, + "complexity_distribution": { + "low": 0, # 1-5 + "moderate": 0, # 6-10 + "high": 0, # 11-20 + "very_high": 0 # > 20 + }, + "complex_functions": [] + } + + if not functions: + return complexity_results + + total_complexity = 0 + max_complexity = 0 + complex_functions = [] + + for func in functions: + # A simple approximation of cyclomatic complexity + # In a real implementation, we would parse the AST and count decision points + source = func.source + + # Count decision points + if_count = source.count("if ") + source.count("elif ") + for_count = source.count("for ") + while_count = source.count("while ") + case_count = source.count("case ") + source.count("switch ") + source.count("match ") + catch_count = source.count("catch ") + source.count("except ") + and_count = source.count(" && ") + source.count(" and ") + or_count = source.count(" || ") + source.count(" or ") + + # Calculate complexity + complexity = 1 + if_count + for_count + while_count + case_count + catch_count + and_count + or_count + + total_complexity += complexity + max_complexity = max(max_complexity, complexity) + + # Categorize complexity + if complexity <= 5: + complexity_results["complexity_distribution"]["low"] += 1 + elif complexity <= 10: + complexity_results["complexity_distribution"]["moderate"] += 1 + elif complexity <= 20: + complexity_results["complexity_distribution"]["high"] += 1 + else: + complexity_results["complexity_distribution"]["very_high"] += 1 + + # Track complex functions + if complexity > 10: + complex_functions.append({ + "name": func.name, + "file": func.file.file_path if hasattr(func, "file") else "Unknown", + "complexity": complexity + }) + + complexity_results["avg_complexity"] = total_complexity / len(functions) + complexity_results["max_complexity"] = max_complexity + complexity_results["complex_functions"] = sorted(complex_functions, key=lambda x: x["complexity"], reverse=True)[:10] # Top 10 most complex + + return complexity_results + + def cc_rank(self) -> Dict[str, str]: + """Rank the codebase based on cyclomatic complexity.""" + complexity_results = self.calculate_cyclomatic_complexity() + avg_complexity = complexity_results["avg_complexity"] + + if avg_complexity < 5: + rank = "A" + description = "Excellent: Low complexity, highly maintainable code" + elif avg_complexity < 10: + rank = "B" + description = "Good: Moderate complexity, maintainable code" + elif avg_complexity < 15: + rank = "C" + description = "Fair: Moderate to high complexity, some maintenance challenges" + elif avg_complexity < 20: + rank = "D" + description = "Poor: High complexity, difficult to maintain" + else: + rank = "F" + description = "Very Poor: Very high complexity, extremely difficult to maintain" + + return { + "rank": rank, + "description": description, + "avg_complexity": avg_complexity + } + + def get_operators_and_operands(self) -> Dict[str, Any]: + """Get operators and operands for Halstead metrics.""" + files = list(self.codebase.files) + + # Define common operators + operators = ["+", "-", "*", "/", "%", "=", "==", "!=", "<", ">", "<=", ">=", + "&&", "||", "!", "&", "|", "^", "~", "<<", ">>", "++", "--", + "+=", "-=", "*=", "/=", "%=", "&=", "|=", "^=", "<<=", ">>="] + + # Count operators and operands + operator_count = {} + operand_count = {} + + for file in files: + if file.is_binary: + continue + + content = file.content + + # Count operators + for op in operators: + count = content.count(op) + if count > 0: + if op in operator_count: + operator_count[op] += count + else: + operator_count[op] = count + + # Simplified operand counting (this is a rough approximation) + # In a real implementation, we would parse the AST and extract identifiers + words = re.findall(r'\b[a-zA-Z_][a-zA-Z0-9_]*\b', content) + for word in words: + if word not in ["if", "else", "for", "while", "return", "break", "continue", + "class", "def", "function", "import", "from", "as", "try", + "except", "finally", "with", "in", "is", "not", "and", "or"]: + if word in operand_count: + operand_count[word] += 1 + else: + operand_count[word] = 1 + + return { + "unique_operators": len(operator_count), + "total_operators": sum(operator_count.values()), + "unique_operands": len(operand_count), + "total_operands": sum(operand_count.values()), + "top_operators": dict(sorted(operator_count.items(), key=lambda x: x[1], reverse=True)[:10]), + "top_operands": dict(sorted(operand_count.items(), key=lambda x: x[1], reverse=True)[:10]) + } + + def calculate_halstead_volume(self) -> Dict[str, float]: + """Calculate Halstead volume metrics.""" + operators_and_operands = self.get_operators_and_operands() + + n1 = operators_and_operands["unique_operators"] + n2 = operators_and_operands["unique_operands"] + N1 = operators_and_operands["total_operators"] + N2 = operators_and_operands["total_operands"] + + # Calculate Halstead metrics + vocabulary = n1 + n2 + length = N1 + N2 + volume = length * math.log2(vocabulary) if vocabulary > 0 else 0 + difficulty = (n1 / 2) * (N2 / n2) if n2 > 0 else 0 + effort = volume * difficulty + time = effort / 18 # Time in seconds (18 is a constant from empirical studies) + bugs = volume / 3000 # Estimated bugs (3000 is a constant from empirical studies) + + return { + "vocabulary": vocabulary, + "length": length, + "volume": volume, + "difficulty": difficulty, + "effort": effort, + "time": time, # in seconds + "bugs": bugs + } + + def count_lines(self) -> Dict[str, int]: + """Count lines of code.""" + files = list(self.codebase.files) + + total_lines = 0 + code_lines = 0 + comment_lines = 0 + blank_lines = 0 + + for file in files: + if file.is_binary: + continue + + content = file.content + lines = content.split('\n') + + total_lines += len(lines) + + for line in lines: + line = line.strip() + + if not line: + blank_lines += 1 + elif line.startswith('#') or line.startswith('//') or line.startswith('/*') or line.startswith('*'): + comment_lines += 1 + else: + code_lines += 1 + + return { + "total_lines": total_lines, + "code_lines": code_lines, + "comment_lines": comment_lines, + "blank_lines": blank_lines, + "comment_ratio": comment_lines / code_lines if code_lines > 0 else 0 + } + + def calculate_maintainability_index(self) -> Dict[str, float]: + """Calculate maintainability index.""" + halstead = self.calculate_halstead_volume() + complexity = self.calculate_cyclomatic_complexity() + lines = self.count_lines() + + # Calculate maintainability index + # MI = 171 - 5.2 * ln(V) - 0.23 * CC - 16.2 * ln(LOC) + volume = halstead["volume"] + avg_complexity = complexity["avg_complexity"] + loc = lines["code_lines"] + + mi = 171 - 5.2 * math.log(volume) - 0.23 * avg_complexity - 16.2 * math.log(loc) if volume > 0 and loc > 0 else 0 + + # Normalize to 0-100 scale + normalized_mi = max(0, min(100, mi * 100 / 171)) + + return { + "maintainability_index": mi, + "normalized_maintainability_index": normalized_mi + } + + def get_maintainability_rank(self) -> Dict[str, str]: + """Rank the codebase based on maintainability index.""" + mi = self.calculate_maintainability_index()["normalized_maintainability_index"] + + if mi >= 85: + rank = "A" + description = "Highly maintainable" + elif mi >= 65: + rank = "B" + description = "Maintainable" + elif mi >= 40: + rank = "C" + description = "Moderately maintainable" + elif mi >= 20: + rank = "D" + description = "Difficult to maintain" + else: + rank = "F" + description = "Very difficult to maintain" + + return { + "rank": rank, + "description": description, + "maintainability_index": mi + } + + def get_cognitive_complexity(self) -> Dict[str, Any]: + """Calculate cognitive complexity for functions.""" + functions = list(self.codebase.functions) + complexity_results = { + "avg_complexity": 0, + "max_complexity": 0, + "complexity_distribution": { + "low": 0, # 0-5 + "moderate": 0, # 6-10 + "high": 0, # 11-20 + "very_high": 0 # > 20 + }, + "complex_functions": [] + } + + if not functions: + return complexity_results + + total_complexity = 0 + max_complexity = 0 + complex_functions = [] + + for func in functions: + # A simple approximation of cognitive complexity + # In a real implementation, we would parse the AST and analyze control flow + source = func.source + + # Count decision points with nesting + nesting_level = 0 + cognitive_complexity = 0 + + lines = source.split('\n') + for line in lines: + line = line.strip() + + # Increase nesting level + if re.search(r'\b(if|for|while|switch|case|catch|try)\b', line): + cognitive_complexity += 1 + nesting_level + nesting_level += 1 + + # Decrease nesting level + if line.startswith('}') or line.endswith(':'): + nesting_level = max(0, nesting_level - 1) + + # Add complexity for boolean operators + cognitive_complexity += line.count(" && ") + line.count(" and ") + cognitive_complexity += line.count(" || ") + line.count(" or ") + + # Add complexity for jumps + if re.search(r'\b(break|continue|goto|return)\b', line): + cognitive_complexity += 1 + + total_complexity += cognitive_complexity + max_complexity = max(max_complexity, cognitive_complexity) + + # Categorize complexity + if cognitive_complexity <= 5: + complexity_results["complexity_distribution"]["low"] += 1 + elif cognitive_complexity <= 10: + complexity_results["complexity_distribution"]["moderate"] += 1 + elif cognitive_complexity <= 20: + complexity_results["complexity_distribution"]["high"] += 1 + else: + complexity_results["complexity_distribution"]["very_high"] += 1 + + # Track complex functions + if cognitive_complexity > 10: + complex_functions.append({ + "name": func.name, + "file": func.file.file_path if hasattr(func, "file") else "Unknown", + "complexity": cognitive_complexity + }) + + complexity_results["avg_complexity"] = total_complexity / len(functions) + complexity_results["max_complexity"] = max_complexity + complexity_results["complex_functions"] = sorted(complex_functions, key=lambda x: x["complexity"], reverse=True)[:10] # Top 10 most complex + + return complexity_results + + def get_nesting_depth_analysis(self) -> Dict[str, Any]: + """Analyze nesting depth in functions.""" + functions = list(self.codebase.functions) + nesting_results = { + "avg_max_nesting": 0, + "max_nesting": 0, + "nesting_distribution": { + "low": 0, # 0-2 + "moderate": 0, # 3-4 + "high": 0, # 5-6 + "very_high": 0 # > 6 + }, + "deeply_nested_functions": [] + } + + if not functions: + return nesting_results + + total_max_nesting = 0 + max_nesting_overall = 0 + deeply_nested_functions = [] + + for func in functions: + source = func.source + lines = source.split('\n') + + max_nesting = 0 + current_nesting = 0 + + for line in lines: + line = line.strip() + + # Increase nesting level + if re.search(r'\b(if|for|while|switch|case|catch|try)\b', line) and not line.startswith('}'): + current_nesting += 1 + max_nesting = max(max_nesting, current_nesting) + + # Decrease nesting level + if line.startswith('}'): + current_nesting = max(0, current_nesting - 1) + + total_max_nesting += max_nesting + max_nesting_overall = max(max_nesting_overall, max_nesting) + + # Categorize nesting + if max_nesting <= 2: + nesting_results["nesting_distribution"]["low"] += 1 + elif max_nesting <= 4: + nesting_results["nesting_distribution"]["moderate"] += 1 + elif max_nesting <= 6: + nesting_results["nesting_distribution"]["high"] += 1 + else: + nesting_results["nesting_distribution"]["very_high"] += 1 + + # Track deeply nested functions + if max_nesting > 4: + deeply_nested_functions.append({ + "name": func.name, + "file": func.file.file_path if hasattr(func, "file") else "Unknown", + "max_nesting": max_nesting + }) + + nesting_results["avg_max_nesting"] = total_max_nesting / len(functions) + nesting_results["max_nesting"] = max_nesting_overall + nesting_results["deeply_nested_functions"] = sorted(deeply_nested_functions, key=lambda x: x["max_nesting"], reverse=True)[:10] # Top 10 most nested + + return nesting_results + + def get_function_size_metrics(self) -> Dict[str, Any]: + """Get function size metrics.""" + functions = list(self.codebase.functions) + size_metrics = { + "avg_function_length": 0, + "max_function_length": 0, + "function_size_distribution": { + "small": 0, # < 10 lines + "medium": 0, # 10-30 lines + "large": 0, # 30-100 lines + "very_large": 0 # > 100 lines + }, + "largest_functions": [] + } + + if not functions: + return size_metrics + + total_length = 0 + max_length = 0 + largest_functions = [] + + for func in functions: + func_source = func.source + func_lines = func_source.count('\n') + 1 + + total_length += func_lines + max_length = max(max_length, func_lines) + + # Categorize by size + if func_lines < 10: + size_metrics["function_size_distribution"]["small"] += 1 + elif func_lines < 30: + size_metrics["function_size_distribution"]["medium"] += 1 + elif func_lines < 100: + size_metrics["function_size_distribution"]["large"] += 1 + else: + size_metrics["function_size_distribution"]["very_large"] += 1 + + # Track large functions + if func_lines > 30: + largest_functions.append({ + "name": func.name, + "file": func.file.file_path if hasattr(func, "file") else "Unknown", + "lines": func_lines + }) + + size_metrics["avg_function_length"] = total_length / len(functions) + size_metrics["max_function_length"] = max_length + size_metrics["largest_functions"] = sorted(largest_functions, key=lambda x: x["lines"], reverse=True)[:10] # Top 10 largest + + return size_metrics + + # + # Visualization and Output Methods + # + + def _generate_html_report(self, output_file: str) -> None: + """Generate an HTML report of the analysis results.""" + if not output_file: + output_file = "codebase_analysis_report.html" + + # Simple HTML template + html = f""" + + + + Codebase Analysis Report + + + +

Codebase Analysis Report

+
+

Metadata

+

Repository: {self.results["metadata"]["repo_name"]}

+

Analysis Time: {self.results["metadata"]["analysis_time"]}

+

Language: {self.results["metadata"]["language"]}

+
+ """ + + # Add each category + for category, metrics in self.results["categories"].items(): + html += f""" +
+

{category.replace("_", " ").title()}

+ """ + + for metric_name, metric_value in metrics.items(): + html += f""" +
+

{metric_name.replace("_", " ").title()}

+
{json.dumps(metric_value, indent=2)}
+
+ """ + + html += "
" + + html += """ + + + """ + + with open(output_file, "w") as f: + f.write(html) + + self.console.print(f"[bold green]HTML report saved to {output_file}[/bold green]") + + def _print_console_report(self) -> None: + """Print a summary report to the console.""" + self.console.print(f"[bold blue]Codebase Analysis Report for {self.results['metadata']['repo_name']}[/bold blue]") + self.console.print(f"[bold]Analysis Time:[/bold] {self.results['metadata']['analysis_time']}") + self.console.print(f"[bold]Language:[/bold] {self.results['metadata']['language']}") + + for category, metrics in self.results["categories"].items(): + self.console.print(f"\n[bold green]{category.replace('_', ' ').title()}[/bold green]") + + for metric_name, metric_value in metrics.items(): + self.console.print(f"[bold]{metric_name.replace('_', ' ').title()}:[/bold]") + + if isinstance(metric_value, dict): + table = Table(show_header=True) + table.add_column("Key") + table.add_column("Value") + + for k, v in metric_value.items(): + if isinstance(v, dict): + table.add_row(k, str(v)) + else: + table.add_row(str(k), str(v)) + + self.console.print(table) + elif isinstance(metric_value, list): + if len(metric_value) > 0 and isinstance(metric_value[0], dict): + if len(metric_value) > 0: + table = Table(show_header=True) + for key in metric_value[0].keys(): + table.add_column(key) + + for item in metric_value[:10]: # Show only first 10 items + table.add_row(*[str(v) for v in item.values()]) + + self.console.print(table) + if len(metric_value) > 10: + self.console.print(f"... and {len(metric_value) - 10} more items") + else: + self.console.print(str(metric_value)) + else: + self.console.print(str(metric_value)) + + def get_monthly_commits(self) -> Dict[str, int]: + """Get the number of commits per month.""" + try: + # Get commit history + commits = list(self.codebase.github.repo.get_commits()) + + # Group commits by month + commits_by_month = {} + + for commit in commits: + date = commit.commit.author.date + month_key = f"{date.year}-{date.month:02d}" + + if month_key in commits_by_month: + commits_by_month[month_key] += 1 + else: + commits_by_month[month_key] = 1 + + # Sort by month + sorted_commits = dict(sorted(commits_by_month.items())) + + return sorted_commits + except Exception as e: + return {"error": str(e)} + + +def main(): + """Main entry point for the codebase analyzer.""" + parser = argparse.ArgumentParser(description="Comprehensive Codebase Analyzer") + + # Repository source + source_group = parser.add_mutually_exclusive_group(required=True) + source_group.add_argument("--repo-url", help="URL of the repository to analyze") + source_group.add_argument("--repo-path", help="Local path to the repository to analyze") + + # Analysis options + parser.add_argument("--language", help="Programming language of the codebase (auto-detected if not provided)") + parser.add_argument("--categories", nargs="+", help="Categories to analyze (default: all)") + + # Output options + parser.add_argument("--output-format", choices=["json", "html", "console"], default="console", help="Output format") + parser.add_argument("--output-file", help="Path to the output file") + + args = parser.parse_args() + + try: + # Initialize the analyzer + analyzer = CodebaseAnalyzer( + repo_url=args.repo_url, + repo_path=args.repo_path, + language=args.language + ) + + # Perform the analysis + results = analyzer.analyze( + categories=args.categories, + output_format=args.output_format, + output_file=args.output_file + ) + + # Print success message + if args.output_format == "json" and args.output_file: + print(f"Analysis results saved to {args.output_file}") + elif args.output_format == "html": + print(f"HTML report saved to {args.output_file or 'codebase_analysis_report.html'}") + + except Exception as e: + print(f"Error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/codegen-on-oss/codegen_on_oss/analysis/optimized_analyzer.py b/codegen-on-oss/codegen_on_oss/analysis/optimized_analyzer.py new file mode 100644 index 000000000..8b410913d --- /dev/null +++ b/codegen-on-oss/codegen_on_oss/analysis/optimized_analyzer.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +""" +Optimized Codebase Analyzer + +This module provides an optimized version of the codebase analyzer +that uses performance optimizations to handle large codebases efficiently. +""" + +import sys +from typing import Dict, List, Any, Optional + +from .codebase_analyzer import CodebaseAnalyzer as BaseCodebaseAnalyzer +from .performance_optimizations import ( + cached_analysis, + parallel_analysis, + memory_optimized, + timed_analysis, + incremental_analysis, + optimized_analysis, +) + + +class OptimizedCodebaseAnalyzer(BaseCodebaseAnalyzer): + """ + Optimized version of the codebase analyzer. + + This class extends the base codebase analyzer with performance optimizations + to handle large codebases efficiently. + """ + + def __init__(self, repo_url: str = None, repo_path: str = None, language: str = None): + """ + Initialize the OptimizedCodebaseAnalyzer. + + Args: + repo_url: URL of the repository to analyze + repo_path: Local path to the repository to analyze + language: Programming language of the codebase (auto-detected if not provided) + """ + super().__init__(repo_url, repo_path, language) + self._temp_data = {} + self._enable_parallel = True + self._enable_incremental = True + + @optimized_analysis + def analyze(self, categories: List[str] = None, output_format: str = "json", output_file: str = None) -> Dict[str, Any]: + """ + Perform a comprehensive analysis of the codebase with performance optimizations. + + Args: + categories: List of categories to analyze. If None, all categories are analyzed. + output_format: Format of the output (json, html, console) + output_file: Path to the output file + + Returns: + Dict containing the analysis results + """ + return super().analyze(categories, output_format, output_file) + + @optimized_analysis + def get_file_count(self) -> Dict[str, int]: + """Get the number of files in the codebase with performance optimizations.""" + return super().get_file_count() + + @optimized_analysis + def get_files_by_language(self) -> Dict[str, int]: + """Get the number of files by language with performance optimizations.""" + return super().get_files_by_language() + + @optimized_analysis + def get_file_size_distribution(self) -> Dict[str, Any]: + """Get the distribution of file sizes with performance optimizations.""" + return super().get_file_size_distribution() + + @optimized_analysis + def get_directory_structure(self) -> Dict[str, Any]: + """Get the directory structure with performance optimizations.""" + return super().get_directory_structure() + + @optimized_analysis + def get_symbol_count(self) -> Dict[str, int]: + """Get the number of symbols in the codebase with performance optimizations.""" + return super().get_symbol_count() + + @optimized_analysis + def get_symbol_type_distribution(self) -> Dict[str, int]: + """Get the distribution of symbol types with performance optimizations.""" + return super().get_symbol_type_distribution() + + @optimized_analysis + def get_symbol_hierarchy(self) -> Dict[str, Any]: + """Get the symbol hierarchy with performance optimizations.""" + return super().get_symbol_hierarchy() + + @optimized_analysis + def get_top_level_vs_nested_symbols(self) -> Dict[str, int]: + """Get the number of top-level vs. nested symbols with performance optimizations.""" + return super().get_top_level_vs_nested_symbols() + + @optimized_analysis + def get_import_dependency_map(self) -> Dict[str, List[str]]: + """Get the import dependency map with performance optimizations.""" + return super().get_import_dependency_map() + + @optimized_analysis + def get_external_vs_internal_dependencies(self) -> Dict[str, int]: + """Get the number of external vs. internal dependencies with performance optimizations.""" + return super().get_external_vs_internal_dependencies() + + @optimized_analysis + def get_circular_imports(self) -> List[List[str]]: + """Get the circular imports with performance optimizations.""" + return super().get_circular_imports() + + @optimized_analysis + def get_unused_imports(self) -> List[Dict[str, str]]: + """Get the unused imports with performance optimizations.""" + return super().get_unused_imports() + + @optimized_analysis + def get_module_coupling_metrics(self) -> Dict[str, float]: + """Get the module coupling metrics with performance optimizations.""" + return super().get_module_coupling_metrics() + + @optimized_analysis + def get_module_cohesion_analysis(self) -> Dict[str, float]: + """Get the module cohesion analysis with performance optimizations.""" + return super().get_module_cohesion_analysis() + + @optimized_analysis + def get_package_structure(self) -> Dict[str, Any]: + """Get the package structure with performance optimizations.""" + return super().get_package_structure() + + @optimized_analysis + def get_module_dependency_graph(self) -> Dict[str, Any]: + """Get the module dependency graph with performance optimizations.""" + return super().get_module_dependency_graph() + + # Add optimized versions of all other analysis methods + # ... + + def enable_parallel_processing(self, enable: bool = True) -> None: + """ + Enable or disable parallel processing. + + Args: + enable: Whether to enable parallel processing + """ + self._enable_parallel = enable + + def enable_incremental_analysis(self, enable: bool = True) -> None: + """ + Enable or disable incremental analysis. + + Args: + enable: Whether to enable incremental analysis + """ + self._enable_incremental = enable + + def clear_cache(self) -> None: + """Clear the analysis cache.""" + if hasattr(self, "_analysis_cache"): + self._analysis_cache.clear() + + +# Update the CodebaseAnalyzer class to use the optimized version +CodebaseAnalyzer = OptimizedCodebaseAnalyzer + diff --git a/codegen-on-oss/codegen_on_oss/analysis/performance_optimizations.py b/codegen-on-oss/codegen_on_oss/analysis/performance_optimizations.py new file mode 100644 index 000000000..363f56d6e --- /dev/null +++ b/codegen-on-oss/codegen_on_oss/analysis/performance_optimizations.py @@ -0,0 +1,340 @@ +#!/usr/bin/env python3 +""" +Performance optimizations for the codebase analyzer. + +This module provides performance optimizations for the codebase analyzer +to handle large codebases efficiently. +""" + +import functools +import os +import pickle +import time +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar, cast + +# Type variables for generic functions +T = TypeVar('T') +R = TypeVar('R') + + +class AnalysisCache: + """ + Cache for analysis results to avoid redundant calculations. + + This class provides a simple caching mechanism for analysis results + to avoid redundant calculations when analyzing large codebases. + """ + + def __init__(self, cache_dir: Optional[Path] = None): + """ + Initialize the analysis cache. + + Args: + cache_dir: Directory to store cache files + """ + if cache_dir is None: + cache_dir = Path.home() / ".cache" / "codegen-on-oss" / "analysis" + + self.cache_dir = cache_dir + self.cache_dir.mkdir(parents=True, exist_ok=True) + self.memory_cache: Dict[str, Any] = {} + + def get(self, key: str) -> Optional[Any]: + """ + Get a value from the cache. + + Args: + key: Cache key + + Returns: + Cached value or None if not found + """ + # Check memory cache first + if key in self.memory_cache: + return self.memory_cache[key] + + # Check disk cache + cache_file = self.cache_dir / f"{key}.pickle" + if cache_file.exists(): + try: + with open(cache_file, "rb") as f: + value = pickle.load(f) + + # Store in memory cache for faster access + self.memory_cache[key] = value + + return value + except Exception: + # If there's an error loading the cache, ignore it + return None + + return None + + def set(self, key: str, value: Any) -> None: + """ + Set a value in the cache. + + Args: + key: Cache key + value: Value to cache + """ + # Store in memory cache + self.memory_cache[key] = value + + # Store in disk cache + cache_file = self.cache_dir / f"{key}.pickle" + try: + with open(cache_file, "wb") as f: + pickle.dump(value, f) + except Exception: + # If there's an error saving the cache, ignore it + pass + + def clear(self) -> None: + """Clear the cache.""" + # Clear memory cache + self.memory_cache.clear() + + # Clear disk cache + for cache_file in self.cache_dir.glob("*.pickle"): + try: + os.remove(cache_file) + except Exception: + # If there's an error removing the cache file, ignore it + pass + + +def cached_analysis(func: Callable[..., R]) -> Callable[..., R]: + """ + Decorator to cache analysis results. + + This decorator caches the results of analysis functions to avoid + redundant calculations when analyzing large codebases. + + Args: + func: Function to cache + + Returns: + Cached function + """ + @functools.wraps(func) + def wrapper(self: Any, *args: Any, **kwargs: Any) -> R: + # Get the cache + if not hasattr(self, "_analysis_cache"): + self._analysis_cache = AnalysisCache() + + # Generate a cache key + key = f"{func.__name__}_{hash(str(args))}_{hash(str(kwargs))}" + + # Check if the result is already cached + cached_result = self._analysis_cache.get(key) + if cached_result is not None: + return cast(R, cached_result) + + # Call the function + result = func(self, *args, **kwargs) + + # Cache the result + self._analysis_cache.set(key, result) + + return result + + return wrapper + + +def parallel_analysis(func: Callable[..., R]) -> Callable[..., R]: + """ + Decorator to parallelize analysis functions. + + This decorator parallelizes analysis functions to improve performance + when analyzing large codebases. + + Args: + func: Function to parallelize + + Returns: + Parallelized function + """ + @functools.wraps(func) + def wrapper(self: Any, *args: Any, **kwargs: Any) -> R: + # Check if parallelization is enabled + if not getattr(self, "_enable_parallel", True): + return func(self, *args, **kwargs) + + # Import multiprocessing here to avoid issues with pickle + import multiprocessing + + # Get the number of processes + num_processes = min(multiprocessing.cpu_count(), 4) + + # Define a helper function to run the analysis in a separate process + def run_analysis() -> R: + return func(self, *args, **kwargs) + + # Create a process pool + with multiprocessing.Pool(processes=num_processes) as pool: + # Run the analysis in a separate process + result = pool.apply(run_analysis) + + return result + + return wrapper + + +def memory_optimized(func: Callable[..., R]) -> Callable[..., R]: + """ + Decorator to optimize memory usage. + + This decorator optimizes memory usage when analyzing large codebases + by clearing unnecessary data after the analysis is complete. + + Args: + func: Function to optimize + + Returns: + Memory-optimized function + """ + @functools.wraps(func) + def wrapper(self: Any, *args: Any, **kwargs: Any) -> R: + # Call the function + result = func(self, *args, **kwargs) + + # Clear any temporary data + if hasattr(self, "_temp_data"): + self._temp_data.clear() + + # Force garbage collection + import gc + gc.collect() + + return result + + return wrapper + + +def timed_analysis(func: Callable[..., R]) -> Callable[..., R]: + """ + Decorator to time analysis functions. + + This decorator times analysis functions to help identify performance + bottlenecks when analyzing large codebases. + + Args: + func: Function to time + + Returns: + Timed function + """ + @functools.wraps(func) + def wrapper(self: Any, *args: Any, **kwargs: Any) -> R: + # Get the logger + import logging + logger = logging.getLogger(__name__) + + # Start timing + start_time = time.time() + + # Call the function + result = func(self, *args, **kwargs) + + # End timing + end_time = time.time() + duration = end_time - start_time + + # Log the duration + logger.info(f"{func.__name__} took {duration:.2f} seconds") + + return result + + return wrapper + + +def incremental_analysis(func: Callable[..., R]) -> Callable[..., R]: + """ + Decorator for incremental analysis. + + This decorator enables incremental analysis to only analyze changes + since the last analysis, which can significantly improve performance + when analyzing large codebases. + + Args: + func: Function to make incremental + + Returns: + Incremental function + """ + @functools.wraps(func) + def wrapper(self: Any, *args: Any, **kwargs: Any) -> R: + # Check if incremental analysis is enabled + if not getattr(self, "_enable_incremental", True): + return func(self, *args, **kwargs) + + # Get the cache + if not hasattr(self, "_analysis_cache"): + self._analysis_cache = AnalysisCache() + + # Generate a cache key + key = f"{func.__name__}_incremental_{hash(str(args))}_{hash(str(kwargs))}" + + # Get the last analysis time + last_analysis_time = self._analysis_cache.get(f"{key}_time") + + # Check if we have a previous result + previous_result = self._analysis_cache.get(key) + + # Check if the codebase has changed since the last analysis + if last_analysis_time is not None and previous_result is not None: + # Get the last modification time of the codebase + if hasattr(self, "repo_path") and self.repo_path: + repo_path = Path(self.repo_path) + last_modified = max( + os.path.getmtime(os.path.join(root, file)) + for root, _, files in os.walk(repo_path) + for file in files + ) + + # If the codebase hasn't changed, return the previous result + if last_modified < last_analysis_time: + return cast(R, previous_result) + + # Call the function + result = func(self, *args, **kwargs) + + # Cache the result and the current time + self._analysis_cache.set(key, result) + self._analysis_cache.set(f"{key}_time", time.time()) + + return result + + return wrapper + + +# Apply all optimizations +def optimized_analysis(func: Callable[..., R]) -> Callable[..., R]: + """ + Apply all optimizations to an analysis function. + + This decorator applies all optimizations to an analysis function: + - Caching + - Parallelization + - Memory optimization + - Timing + - Incremental analysis + + Args: + func: Function to optimize + + Returns: + Optimized function + """ + return cached_analysis( + parallel_analysis( + memory_optimized( + timed_analysis( + incremental_analysis(func) + ) + ) + ) + ) + diff --git a/codegen-on-oss/codegen_on_oss/cli.py b/codegen-on-oss/codegen_on_oss/cli.py index c1807d13e..7abd752e3 100644 --- a/codegen-on-oss/codegen_on_oss/cli.py +++ b/codegen-on-oss/codegen_on_oss/cli.py @@ -1,127 +1,187 @@ +import os import sys from pathlib import Path +from typing import Optional import click from loguru import logger -from codegen_on_oss.cache import cachedir -from codegen_on_oss.metrics import MetricsProfiler -from codegen_on_oss.outputs.csv_output import CSVOutput -from codegen_on_oss.parser import CodegenParser -from codegen_on_oss.sources import RepoSource, all_sources +from .metrics import MetricsProfiler +from .parser import CodegenParser, ParseRunError +from .sources import CSVInputSource, GithubSource, RepoSource + +# Add import for the codebase analyzer +from .analysis.codebase_analyzer import CodebaseAnalyzer logger.remove(0) @click.group() def cli(): + """Codegen OSS Parser CLI.""" pass -@cli.command(name="run-one") -@click.argument("url", type=str) +@cli.command() @click.option( - "--cache-dir", - type=click.Path(dir_okay=True), - help="Cache directory", - default=cachedir, + "--source", + type=click.Choice(["csv", "github"]), + default="csv", + help="Source of repository URLs", ) @click.option( "--output-path", - type=click.Path(dir_okay=True), - help="Output path", + type=click.Path(dir_okay=False, writable=True), default="metrics.csv", -) -@click.option( - "--commit-hash", - type=str, - help="Commit hash to parse", + help="Path to write metrics CSV", ) @click.option( "--error-output-path", - type=click.Path(dir_okay=True), - help="Error output path", - default=cachedir / "errors.log", + type=click.Path(dir_okay=False, writable=True), + default="errors.log", + help="Path to write error logs", ) @click.option( - "--debug", - is_flag=True, - help="Debug mode", + "--cache-dir", + type=click.Path(file_okay=False, writable=True), + default=Path.home() / ".cache" / "codegen-on-oss", + help="Directory to cache repositories", ) -def run_one( - url: str, - cache_dir: str | Path = str(cachedir), - output_path: str = "metrics.csv", - commit_hash: str | None = None, - error_output_path: Path = str(cachedir / "errors.log"), - debug: bool = False, -): - """ - Parse a repository with codegen - """ - logger.add(error_output_path, level="ERROR") - logger.add(sys.stdout, level="DEBUG" if debug else "INFO") +def run( + source: str, + output_path: str, + error_output_path: str, + cache_dir: Path, +) -> None: + """Run the parser on multiple repositories.""" + logger.add( + error_output_path, format="{time: HH:mm:ss} {level} {message}", level="ERROR" + ) + logger.add( + sys.stdout, + format="{time: HH:mm:ss} {level} {message}", + level="DEBUG", + ) + + repo_source = RepoSource.from_source_type(source) output = CSVOutput(MetricsProfiler.fields(), output_path) metrics_profiler = MetricsProfiler(output) - parser = CodegenParser(Path(cache_dir) / "repositories", metrics_profiler) - parser.parse(url, commit_hash) + for repo_url, commit_hash in repo_source: + parser.parse(repo_url, commit_hash) @cli.command() @click.option( - "--source", - type=click.Choice(list(all_sources.keys())), - default="csv", + "--repo-url", + type=str, + required=True, + help="URL of the repository to parse", ) @click.option( "--output-path", - type=click.Path(dir_okay=True), - help="Output path", + type=click.Path(dir_okay=False, writable=True), default="metrics.csv", + help="Path to write metrics CSV", ) @click.option( "--error-output-path", - type=click.Path(dir_okay=True), - help="Error output path", + type=click.Path(dir_okay=False, writable=True), default="errors.log", + help="Path to write error logs", ) @click.option( "--cache-dir", - type=click.Path(dir_okay=True), - help="Cache directory", - default=cachedir, -) -@click.option( - "--debug", - is_flag=True, - help="Debug mode", + type=click.Path(file_okay=False, writable=True), + default=Path.home() / ".cache" / "codegen-on-oss", + help="Directory to cache repositories", ) -def run( - source: str, +def run_one( + repo_url: str, output_path: str, error_output_path: str, - cache_dir: str, - debug: bool, -): - """ - Run codegen parsing pipeline on repositories from a given repository source. - """ - logger.add( - error_output_path, format="{time: HH:mm:ss} {level} {message}", level="ERROR" - ) - logger.add( - sys.stdout, - format="{time: HH:mm:ss} {level} {message}", - level="DEBUG" if debug else "INFO", - ) - - repo_source = RepoSource.from_source_type(source) + cache_dir: Path, +) -> None: + """Run the parser on a single repository.""" + logger.add(error_output_path, level="ERROR") + logger.add(sys.stdout, level="DEBUG") output = CSVOutput(MetricsProfiler.fields(), output_path) metrics_profiler = MetricsProfiler(output) parser = CodegenParser(Path(cache_dir) / "repositories", metrics_profiler) - for repo_url, commit_hash in repo_source: - parser.parse(repo_url, commit_hash) + parser.parse(repo_url) + + +@cli.command() +@click.option( + "--repo-url", + type=str, + help="URL of the repository to analyze", +) +@click.option( + "--repo-path", + type=click.Path(exists=True, file_okay=False), + help="Local path to the repository to analyze", +) +@click.option( + "--language", + type=str, + help="Programming language of the codebase (auto-detected if not provided)", +) +@click.option( + "--categories", + multiple=True, + help="Categories to analyze (default: all)", +) +@click.option( + "--output-format", + type=click.Choice(["json", "html", "console"]), + default="console", + help="Output format", +) +@click.option( + "--output-file", + type=click.Path(dir_okay=False, writable=True), + help="Path to the output file", +) +def analyze( + repo_url: Optional[str], + repo_path: Optional[str], + language: Optional[str], + categories: Optional[tuple], + output_format: str, + output_file: Optional[str], +) -> None: + """Analyze a codebase and generate a report.""" + if not repo_url and not repo_path: + click.echo("Error: Either --repo-url or --repo-path must be provided") + sys.exit(1) + + try: + # Initialize the analyzer + analyzer = CodebaseAnalyzer( + repo_url=repo_url, + repo_path=repo_path, + language=language + ) + + # Perform the analysis + results = analyzer.analyze( + categories=list(categories) if categories else None, + output_format=output_format, + output_file=output_file + ) + + # Print success message + if output_format == "json" and output_file: + click.echo(f"Analysis results saved to {output_file}") + elif output_format == "html": + click.echo(f"HTML report saved to {output_file or 'codebase_analysis_report.html'}") + + except Exception as e: + click.echo(f"Error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) if __name__ == "__main__": diff --git a/codegen-on-oss/docs/codebase_analyzer.md b/codegen-on-oss/docs/codebase_analyzer.md new file mode 100644 index 000000000..752e13ef6 --- /dev/null +++ b/codegen-on-oss/docs/codebase_analyzer.md @@ -0,0 +1,253 @@ +# Codebase Analyzer + +The Codebase Analyzer is a comprehensive static code analysis tool that provides detailed insights into a codebase's structure, dependencies, code quality, and more. + +## Installation + +The Codebase Analyzer is included in the codegen-on-oss package. To install it, run: + +```bash +pip install codegen-on-oss +``` + +## Usage + +### Command Line Interface + +The Codebase Analyzer can be used from the command line: + +```bash +# Analyze a repository by URL +cgparse analyze --repo-url https://github.com/username/repo + +# Analyze a local repository +cgparse analyze --repo-path /path/to/local/repo + +# Specify output format and file +cgparse analyze --repo-url https://github.com/username/repo --output-format html --output-file report.html + +# Analyze specific categories +cgparse analyze --repo-url https://github.com/username/repo --categories codebase_structure code_quality +``` + +### Python API + +The Codebase Analyzer can also be used as a Python library: + +```python +from codegen_on_oss.analysis import CodebaseAnalyzer + +# Initialize the analyzer +analyzer = CodebaseAnalyzer(repo_url="https://github.com/username/repo") + +# Perform the analysis +results = analyzer.analyze( + categories=["codebase_structure", "code_quality"], + output_format="json", + output_file="analysis.json" +) + +# Access the results +print(results["metadata"]["repo_name"]) +print(results["categories"]["codebase_structure"]["file_count"]) +``` + +## Analysis Categories + +The Codebase Analyzer provides analysis in the following categories: + +### Codebase Structure + +Analyzes the overall structure of the codebase, including: + +- File count and distribution +- Language distribution +- Directory structure +- Symbol count and distribution +- Import dependencies +- Module coupling and cohesion +- Package structure +- Module dependency graph + +### Symbol Level + +Analyzes individual symbols (functions, classes, etc.) in the codebase, including: + +- Function parameter analysis +- Return type analysis +- Function complexity metrics +- Call site tracking +- Async function detection +- Function overload analysis +- Inheritance hierarchy +- Method analysis +- Attribute analysis +- Constructor analysis +- Interface implementation verification +- Access modifier usage +- Type inference +- Usage tracking +- Scope analysis +- Constant vs. mutable usage +- Global variable detection +- Type alias resolution +- Generic type usage +- Type consistency checking +- Union/intersection type analysis + +### Dependency Flow + +Analyzes the flow of dependencies in the codebase, including: + +- Function call relationships +- Call hierarchy visualization +- Entry point analysis +- Dead code detection +- Variable usage tracking +- Data transformation paths +- Input/output parameter analysis +- Conditional branch analysis +- Loop structure analysis +- Exception handling paths +- Return statement analysis +- Symbol reference tracking +- Usage frequency metrics +- Cross-file symbol usage + +### Code Quality + +Analyzes the quality of the code, including: + +- Unused functions, classes, and variables +- Similar function detection +- Repeated code patterns +- Refactoring opportunities +- Cyclomatic complexity +- Cognitive complexity +- Nesting depth analysis +- Function size metrics +- Naming convention consistency +- Comment coverage +- Documentation completeness +- Code formatting consistency + +### Visualization + +Provides visualizations of the codebase, including: + +- Module dependency visualization +- Symbol dependency visualization +- Import relationship graphs +- Function call visualization +- Call hierarchy trees +- Entry point flow diagrams +- Class hierarchy visualization +- Symbol relationship diagrams +- Package structure visualization +- Code complexity heat maps +- Usage frequency visualization +- Change frequency analysis + +### Language Specific + +Provides language-specific analysis, including: + +- Decorator usage analysis +- Dynamic attribute access detection +- Type hint coverage +- Magic method usage +- Interface implementation verification +- Type definition completeness +- JSX/TSX component analysis +- Type narrowing pattern detection + +### Code Metrics + +Provides code metrics, including: + +- Monthly commits +- Cyclomatic complexity +- Halstead volume +- Maintainability index +- Lines of code + +## Performance Optimization + +The Codebase Analyzer includes several optimizations for analyzing large codebases: + +1. **Caching**: The analyzer caches intermediate results to avoid redundant calculations. +2. **Lazy Loading**: The analyzer uses lazy loading to only load the parts of the codebase that are needed for the requested analysis. +3. **Incremental Analysis**: The analyzer supports incremental analysis to only analyze changes since the last analysis. +4. **Parallel Processing**: The analyzer uses parallel processing for independent analysis tasks. +5. **Memory Optimization**: The analyzer uses memory-efficient data structures and algorithms to minimize memory usage. + +## Extending the Analyzer + +The Codebase Analyzer is designed to be extensible. You can add new analysis methods by: + +1. Adding a new method to the `CodebaseAnalyzer` class +2. Adding the method to the appropriate category in the `METRICS_CATEGORIES` dictionary +3. Implementing the method to return a dictionary of analysis results + +For example, to add a new method for analyzing function names: + +```python +def get_function_name_analysis(self) -> Dict[str, Any]: + """Analyze function names in the codebase.""" + functions = list(self.codebase.functions) + name_analysis = { + "avg_name_length": 0, + "name_patterns": {}, + "common_prefixes": {}, + "common_suffixes": {} + } + + # Implement the analysis + # ... + + return name_analysis +``` + +Then add it to the appropriate category: + +```python +METRICS_CATEGORIES = { + "code_quality": [ + # ... existing methods ... + "get_function_name_analysis", + ], + # ... other categories ... +} +``` + +## Troubleshooting + +### Common Issues + +1. **Memory Errors**: If you encounter memory errors when analyzing large codebases, try: + - Analyzing specific categories instead of all categories + - Using a machine with more memory + - Reducing the depth of analysis + +2. **Slow Analysis**: If the analysis is slow, try: + - Analyzing specific categories instead of all categories + - Using a faster machine + - Reducing the depth of analysis + +3. **Import Errors**: If you encounter import errors, make sure: + - The Codegen SDK is installed + - All dependencies are installed + - The Python path is correctly set + +### Getting Help + +If you encounter issues with the Codebase Analyzer, please: + +1. Check the documentation +2. Check the GitHub issues for similar problems +3. Create a new issue with a detailed description of the problem + +## Contributing + +Contributions to the Codebase Analyzer are welcome! Please see the [contributing guidelines](CONTRIBUTING.md) for more information. + diff --git a/codegen-on-oss/pyproject.toml b/codegen-on-oss/pyproject.toml index b4227c454..7dd444461 100644 --- a/codegen-on-oss/pyproject.toml +++ b/codegen-on-oss/pyproject.toml @@ -22,6 +22,9 @@ dependencies = [ "modal>=0.73.51", "pydantic-settings>=2.7.1", "pygithub>=2.5.0", + "matplotlib>=3.8.0", + "networkx>=3.1", + "rich>=13.5.0", ] [project.urls] diff --git a/codegen-on-oss/scripts/test_analyzer.py b/codegen-on-oss/scripts/test_analyzer.py new file mode 100644 index 000000000..7e9d5757f --- /dev/null +++ b/codegen-on-oss/scripts/test_analyzer.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 +""" +Test the codebase analyzer with various repositories. + +This script tests the codebase analyzer with a variety of repositories +to ensure it works correctly with different codebases. +""" + +import argparse +import json +import os +import sys +import time +from pathlib import Path +from typing import Dict, List, Any + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from codegen_on_oss.analysis.codebase_analyzer import CodebaseAnalyzer +from rich.console import Console +from rich.table import Table +from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn + + +# Test repositories +TEST_REPOS = [ + # Small repositories + "https://github.com/pallets/click", + "https://github.com/psf/black", + + # Medium repositories + "https://github.com/django/django", + "https://github.com/pandas-dev/pandas", + + # Large repositories + "https://github.com/tensorflow/tensorflow", + "https://github.com/pytorch/pytorch", + + # Different languages + "https://github.com/facebook/react", # JavaScript + "https://github.com/golang/go", # Go + "https://github.com/rust-lang/rust", # Rust +] + + +def test_repository(repo_url: str, categories: List[str] = None, output_dir: Path = None) -> Dict[str, Any]: + """ + Test the codebase analyzer with a repository. + + Args: + repo_url: URL of the repository to test + categories: List of categories to analyze + output_dir: Directory to save the analysis results + + Returns: + Dict containing the test results + """ + console = Console() + console.print(f"[bold blue]Testing repository: {repo_url}[/bold blue]") + + start_time = time.time() + + try: + # Initialize the analyzer + analyzer = CodebaseAnalyzer(repo_url=repo_url) + + # Perform the analysis + results = analyzer.analyze(categories=categories) + + end_time = time.time() + duration = end_time - start_time + + # Save the results if output_dir is provided + if output_dir: + output_dir.mkdir(parents=True, exist_ok=True) + repo_name = repo_url.split("/")[-1] + output_file = output_dir / f"{repo_name}.json" + + with open(output_file, "w") as f: + json.dump(results, f, indent=2) + + console.print(f"[bold green]Results saved to {output_file}[/bold green]") + + console.print(f"[bold green]Analysis completed in {duration:.2f} seconds[/bold green]") + + return { + "repo_url": repo_url, + "success": True, + "duration": duration, + "error": None, + "results": results + } + + except Exception as e: + end_time = time.time() + duration = end_time - start_time + + console.print(f"[bold red]Error analyzing repository: {e}[/bold red]") + + return { + "repo_url": repo_url, + "success": False, + "duration": duration, + "error": str(e), + "results": None + } + + +def main(): + """Main entry point for the test script.""" + parser = argparse.ArgumentParser(description="Test the codebase analyzer with various repositories") + + parser.add_argument( + "--repos", + nargs="+", + help="List of repository URLs to test (default: predefined list)", + ) + parser.add_argument( + "--categories", + nargs="+", + help="Categories to analyze (default: all)", + ) + parser.add_argument( + "--output-dir", + type=Path, + help="Directory to save the analysis results", + ) + + args = parser.parse_args() + + repos = args.repos or TEST_REPOS + categories = args.categories + output_dir = args.output_dir + + console = Console() + + # Create a table to display the results + table = Table(title="Codebase Analyzer Test Results") + table.add_column("Repository") + table.add_column("Success") + table.add_column("Duration (s)") + table.add_column("Error") + + # Test each repository + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TimeElapsedColumn(), + ) as progress: + task = progress.add_task("[cyan]Testing repositories...", total=len(repos)) + + results = [] + + for repo_url in repos: + progress.update(task, description=f"[cyan]Testing {repo_url}...") + + result = test_repository(repo_url, categories, output_dir) + results.append(result) + + progress.advance(task) + + # Display the results + for result in results: + table.add_row( + result["repo_url"], + "[green]✓[/green]" if result["success"] else "[red]✗[/red]", + f"{result['duration']:.2f}", + result["error"] or "" + ) + + console.print(table) + + # Print summary + success_count = sum(1 for result in results if result["success"]) + console.print(f"[bold]Summary:[/bold] {success_count}/{len(results)} repositories analyzed successfully") + + +if __name__ == "__main__": + main() + diff --git a/codegen-on-oss/tests/test_codebase_analyzer.py b/codegen-on-oss/tests/test_codebase_analyzer.py new file mode 100644 index 000000000..fb372387c --- /dev/null +++ b/codegen-on-oss/tests/test_codebase_analyzer.py @@ -0,0 +1,73 @@ +""" +Tests for the codebase analyzer. +""" + +import os +import sys +import unittest +from unittest.mock import MagicMock, patch + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from codegen_on_oss.analysis.codebase_analyzer import CodebaseAnalyzer + + +class TestCodebaseAnalyzer(unittest.TestCase): + """Test cases for the CodebaseAnalyzer class.""" + + @patch('codegen_on_oss.analysis.codebase_analyzer.Codebase') + def test_init_from_path(self, mock_codebase): + """Test initializing the analyzer from a local path.""" + # Setup + mock_codebase_instance = MagicMock() + mock_codebase.return_value = mock_codebase_instance + + # Execute + analyzer = CodebaseAnalyzer(repo_path='/path/to/repo') + + # Assert + self.assertEqual(analyzer.repo_path, '/path/to/repo') + self.assertIsNone(analyzer.repo_url) + self.assertEqual(analyzer.codebase, mock_codebase_instance) + + @patch('codegen_on_oss.analysis.codebase_analyzer.Codebase') + def test_init_from_url(self, mock_codebase): + """Test initializing the analyzer from a URL.""" + # Setup + mock_codebase_instance = MagicMock() + mock_codebase.from_github.return_value = mock_codebase_instance + + # Execute + analyzer = CodebaseAnalyzer(repo_url='https://github.com/username/repo') + + # Assert + self.assertEqual(analyzer.repo_url, 'https://github.com/username/repo') + self.assertIsNone(analyzer.repo_path) + self.assertEqual(analyzer.codebase, mock_codebase_instance) + + @patch('codegen_on_oss.analysis.codebase_analyzer.CodebaseAnalyzer._init_from_url') + @patch('codegen_on_oss.analysis.codebase_analyzer.CodebaseAnalyzer._init_from_path') + def test_init_priority(self, mock_init_from_path, mock_init_from_url): + """Test that URL initialization takes priority over path.""" + # Execute + CodebaseAnalyzer(repo_url='https://github.com/username/repo', repo_path='/path/to/repo') + + # Assert + mock_init_from_url.assert_called_once() + mock_init_from_path.assert_not_called() + + @patch('codegen_on_oss.analysis.codebase_analyzer.CodebaseAnalyzer._init_from_path') + def test_analyze_without_init(self, _): + """Test that analyze raises an error if codebase is not initialized.""" + # Setup + analyzer = CodebaseAnalyzer(repo_path='/path/to/repo') + analyzer.codebase = None + + # Execute and Assert + with self.assertRaises(ValueError): + analyzer.analyze() + + +if __name__ == '__main__': + unittest.main() +