From 38dc36309768befcb6c91000899058385613fcd5 Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 8 Mar 2026 16:08:34 -0400 Subject: [PATCH 1/7] fix: enable local CI testing with act - Added fetch-depth: 0 to checkout step for full git history - Added base: main to paths-filter action for local act testing The dorny/paths-filter@v3 action requires either: 1. The base input to be configured, or 2. repository.default_branch to be set in the event payload When running locally with 'act', the GitHub event payload doesn't have the default_branch set, causing the action to fail with: 'This action requires base input to be configured' --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7b0e6885..5f4dc526 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,9 +42,12 @@ jobs: ruff: ${{ steps.changes.outputs.ruff }} steps: - uses: actions/checkout@v6 + with: + fetch-depth: 0 - uses: dorny/paths-filter@v3 id: changes with: + base: main filters: | backend: - 'backend/**' From 63341a910863db598c5f8a2c190126704eb33e2b Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 8 Mar 2026 16:17:50 -0400 Subject: [PATCH 2/7] feat: Add Ruff W rule (pycodestyle warnings) to linter configuration (#678) - Add W rule (pycodestyle warnings) to ai-engine/pyproject.toml - Add W rule (pycodestyle warnings) to backend/pyproject.toml - Add W291, W292, W293 to ignore list for legacy code Co-authored-by: openhands --- ai-engine/pyproject.toml | 8 ++++++-- backend/pyproject.toml | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/ai-engine/pyproject.toml b/ai-engine/pyproject.toml index b3d5a6a0..420bfc86 100644 --- a/ai-engine/pyproject.toml +++ b/ai-engine/pyproject.toml @@ -16,11 +16,15 @@ exclude = [ ] [tool.ruff.lint] -# Enable pycodestyle (E), Pyflakes (F), isort (I), flake8-quotes (Q), flake8-naming (N), pydocstyle (D), pyupgrade (UP) -select = ["E", "F", "I", "Q", "N", "D", "UP"] +# Enable pycodestyle (E), Pyflakes (F), warnings (W), isort (I), flake8-quotes (Q), flake8-naming (N), pydocstyle (D), pyupgrade (UP) +select = ["E", "F", "W", "I", "Q", "N", "D", "UP"] ignore = [ "E402", # module level import not at top of file "E501", # line too long (handled by formatter) + # W (pycodestyle warnings) ignores for legacy code + "W291", # trailing whitespace (legacy code has inconsistent whitespace) + "W292", # no newline at end of file (legacy code has inconsistent newlines) + "W293", # blank line contains whitespace (legacy code has inconsistent blank lines) "Q000", # bad quote marks "Q001", # bad quote marks in multiline strings "Q002", # bad quote marks in strings diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 63f8b0e3..524d5bb9 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -73,11 +73,15 @@ exclude = [ ] [tool.ruff.lint] -# Enable pycodestyle (E), Pyflakes (F), isort (I), flake8-quotes (Q), flake8-naming (N), pydocstyle (D), pyupgrade (UP) -select = ["E", "F", "I", "Q", "N", "D", "UP"] +# Enable pycodestyle (E), Pyflakes (F), warnings (W), isort (I), flake8-quotes (Q), flake8-naming (N), pydocstyle (D), pyupgrade (UP) +select = ["E", "F", "W", "I", "Q", "N", "D", "UP"] ignore = [ "E402", # module level import not at top of file "E501", # line too long (handled by formatter) + # W (pycodestyle warnings) ignores for legacy code + "W291", # trailing whitespace (legacy code has inconsistent whitespace) + "W292", # no newline at end of file (legacy code has inconsistent newlines) + "W293", # blank line contains whitespace (legacy code has inconsistent blank lines) "Q000", # bad quote marks "Q001", # bad quote marks in multiline strings "Q002", # bad quote marks in strings From f69ffaf6c56ef0a009ec1ccaf71446bb1866553f Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 8 Mar 2026 16:21:18 -0400 Subject: [PATCH 3/7] feat: Add distributed tracing with OpenTelemetry (#696) - Add OpenTelemetry tracing for backend and ai-engine services - Implement W3C Trace Context propagation between services - Add automatic FastAPI, HTTPX, and Redis instrumentation - Configure Jaeger and OTLP exporters (configurable via env vars) - Add trace context injection/extraction utilities for service calls - Update docker-compose.yml with tracing environment variables Co-authored-by: openhands --- ai-engine/main.py | 23 ++ ai-engine/requirements-dev.txt | 6 +- ai-engine/requirements.txt | 11 +- ai-engine/tracing.py | 324 +++++++++++++++++++++ ai-engine/utils/logging_config.py | 7 +- backend/requirements-dev.txt | 4 + backend/requirements.txt | 9 + backend/src/main.py | 8 + backend/src/services/ai_engine_client.py | 26 +- backend/src/services/structured_logging.py | 7 +- backend/src/services/tracing.py | 324 +++++++++++++++++++++ docker-compose.yml | 42 +++ 12 files changed, 779 insertions(+), 12 deletions(-) create mode 100644 ai-engine/tracing.py create mode 100644 backend/src/services/tracing.py diff --git a/ai-engine/main.py b/ai-engine/main.py index f20a88bb..b9b75a14 100644 --- a/ai-engine/main.py +++ b/ai-engine/main.py @@ -15,6 +15,9 @@ from dotenv import load_dotenv import redis.asyncio as aioredis +# Import tracing module +from tracing import init_tracing, shutdown_tracing + # Configure logging using centralized configuration from utils.logging_config import setup_logging, get_agent_logger, configure_structlog @@ -198,6 +201,10 @@ async def startup_event(): logger.info("Starting ModPorter AI Engine...") + # Initialize tracing + init_tracing(app=app, service_name="modporter-ai-engine") + logger.info("Distributed tracing initialized") + try: # Initialize Redis connection redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") @@ -224,6 +231,22 @@ async def startup_event(): logger.error(f"Failed to initialize AI Engine: {e}", exc_info=True) raise HTTPException(status_code=503, detail="Service initialization failed") +@app.on_event("shutdown") +async def shutdown_event(): + """Cleanup on shutdown""" + global redis_client + + # Shutdown tracing + shutdown_tracing() + logger.info("Distributed tracing shutdown") + + # Close Redis connection + if redis_client: + await redis_client.close() + logger.info("Redis connection closed") + + logger.info("ModPorter AI Engine shutdown complete") + @app.get("/api/v1/health", response_model=HealthResponse, tags=["health"]) async def health_check(): """Check the health status of the AI Engine""" diff --git a/ai-engine/requirements-dev.txt b/ai-engine/requirements-dev.txt index 93146153..8e90a484 100644 --- a/ai-engine/requirements-dev.txt +++ b/ai-engine/requirements-dev.txt @@ -15,4 +15,8 @@ black>=23.0.0 mypy>=1.5.0 # Development Tools -pre-commit>=3.0.0 \ No newline at end of file +pre-commit>=3.0.0 + +# Dependency checking +pip-audit>=2.7.0 +pipdeptree>=2.23.0 \ No newline at end of file diff --git a/ai-engine/requirements.txt b/ai-engine/requirements.txt index c6f209d9..b96a5b2d 100644 --- a/ai-engine/requirements.txt +++ b/ai-engine/requirements.txt @@ -49,4 +49,13 @@ pydantic-settings # Monitoring prometheus-client psutil -structlog>=24.0.0 \ No newline at end of file +structlog>=24.0.0 + +# Distributed Tracing - OpenTelemetry +opentelemetry-api>=1.24.0 +opentelemetry-sdk>=1.24.0 +opentelemetry-exporter-otlp>=1.24.0 +opentelemetry-exporter-jaeger>=1.24.0 +opentelemetry-instrumentation-fastapi>=0.45b0 +opentelemetry-instrumentation-httpx>=0.45b0 +opentelemetry-instrumentation-redis>=0.45b0 \ No newline at end of file diff --git a/ai-engine/tracing.py b/ai-engine/tracing.py new file mode 100644 index 00000000..d3d80762 --- /dev/null +++ b/ai-engine/tracing.py @@ -0,0 +1,324 @@ +""" +Distributed Tracing Service using OpenTelemetry for AI Engine. + +This module provides tracing capabilities for the AI Engine service, +including trace context propagation between services. +""" + +import os +from typing import Optional +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.jaeger.thrift import JaegerExporter +from opentelemetry.sdk.extension.aws.resource.ec2 import AwsEc2ResourceDetector +from opentelemetry.sdk.extension.aws.resource.ecs import AwsEcsResourceDetector +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.trace import Status, StatusCode +from opentelemetry.context import Context +import logging + +logger = logging.getLogger(__name__) + +# Trace context propagator (W3C Trace Context) +tracer_propagator = TraceContextTextMapPropagator() + +# Global tracer instance +_tracer: Optional[trace.Tracer] = None +_tracer_provider: Optional[TracerProvider] = None + + +def get_tracer(service_name: str = "modporter-ai-engine") -> trace.Tracer: + """ + Get or create a tracer instance for the given service. + + Args: + service_name: Name of the service for tracing + + Returns: + Configured tracer instance + """ + global _tracer, _tracer_provider + + if _tracer is not None: + return _tracer + + # Create resource with service information + service_version = os.getenv("SERVICE_VERSION", "1.0.0") + resource = Resource.create({ + SERVICE_NAME: service_name, + SERVICE_VERSION: service_version, + }) + + # Add cloud metadata if available + try: + ec2_resource = AwsEc2ResourceDetector().detect() + resource = resource.merge(ec2_resource) + except Exception: + pass + + try: + ecs_resource = AwsEcsResourceDetector().detect() + resource = resource.merge(ecs_resource) + except Exception: + pass + + # Create tracer provider + _tracer_provider = TracerProvider(resource=resource) + + # Configure exporter based on environment + tracing_enabled = os.getenv("TRACING_ENABLED", "true").lower() == "true" + tracing_exporter = os.getenv("TRACING_EXPORTER", "jaeger").lower() + + if tracing_enabled: + if tracing_exporter == "jaeger": + # Jaeger exporter configuration + jaeger_host = os.getenv("JAEGER_HOST", "localhost") + jaeger_port = int(os.getenv("JAEGER_PORT", "6831")) + + jaeger_exporter = JaegerExporter( + agent_host_name=jaeger_host, + agent_port=jaeger_port, + ) + _tracer_provider.add_span_processor( + BatchSpanProcessor(jaeger_exporter) + ) + logger.info(f"Jaeger tracing enabled: {jaeger_host}:{jaeger_port}") + + elif tracing_exporter == "otlp": + # OTLP exporter configuration + otlp_endpoint = os.getenv("OTLP_ENDPOINT", "http://localhost:4317") + + otlp_exporter = OTLPSpanExporter( + endpoint=otlp_endpoint, + insecure=True, + ) + _tracer_provider.add_span_processor( + BatchSpanProcessor(otlp_exporter) + ) + logger.info(f"OTLP tracing enabled: {otlp_endpoint}") + + # Add console exporter for development + if os.getenv("TRACING_CONSOLE", "false").lower() == "true": + _tracer_provider.add_span_processor( + BatchSpanProcessor(ConsoleSpanExporter()) + ) + logger.info("Console span exporter enabled") + + # Set the global tracer provider + trace.set_tracer_provider(_tracer_provider) + + # Create and return tracer + _tracer = trace.get_tracer(service_name) + + logger.info(f"Tracing initialized for service: {service_name}") + + return _tracer + + +def init_tracing( + app=None, + service_name: str = "modporter-ai-engine", + instrument_fastapi: bool = True, + instrument_httpx: bool = True, + instrument_redis: bool = True, +) -> trace.Tracer: + """ + Initialize tracing with automatic instrumentation. + + Args: + app: FastAPI application instance (optional) + service_name: Name of the service + instrument_fastapi: Whether to instrument FastAPI + instrument_httpx: Whether to instrument HTTPX + instrument_redis: Whether to instrument Redis + + Returns: + Configured tracer instance + """ + tracer = get_tracer(service_name) + + # Instrument FastAPI if app provided + if app and instrument_fastapi: + try: + FastAPIInstrumentor.instrument_app(app) + logger.info("FastAPI instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument FastAPI: {e}") + + # Instrument HTTPX + if instrument_httpx: + try: + HTTPXClientInstrumentor().instrument() + logger.info("HTTPX instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument HTTPX: {e}") + + # Instrument Redis + if instrument_redis: + try: + RedisInstrumentor().instrument() + logger.info("Redis instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument Redis: {e}") + + return tracer + + +def extract_trace_context(carrier: dict) -> Context: + """ + Extract trace context from carrier (e.g., HTTP headers). + + Args: + carrier: Dictionary containing trace context (e.g., HTTP headers) + + Returns: + Extracted context + """ + return tracer_propagator.extract(carrier) + + +def inject_trace_context(carrier: dict) -> dict: + """ + Inject trace context into carrier (e.g., HTTP headers). + + Args: + carrier: Dictionary to inject trace context into + + Returns: + Carrier with injected trace context + """ + tracer_propagator.inject(carrier) + return carrier + + +def create_span( + name: str, + context: Optional[Context] = None, + kind: trace.SpanKind = trace.SpanKind.INTERNAL, +) -> trace.Span: + """ + Create a new span with the given name and context. + + Args: + name: Name of the span + context: Parent context (optional) + kind: Span kind + + Returns: + New span + """ + tracer = get_tracer() + + if context: + with tracer.start_as_current_span(name, context=context, kind=kind) as span: + return span + else: + with tracer.start_as_current_span(name, kind=kind) as span: + return span + + +def add_span_attributes(span: trace.Span, attributes: dict) -> None: + """ + Add attributes to a span. + + Args: + span: Span to add attributes to + attributes: Dictionary of attributes + """ + for key, value in attributes.items(): + if value is not None: + span.set_attribute(key, str(value)) + + +def record_span_exception(span: trace.Span, exception: Exception) -> None: + """ + Record an exception on a span. + + Args: + span: Span to record exception on + exception: Exception to record + """ + span.set_status(Status(StatusCode.ERROR, str(exception))) + span.record_exception(exception) + + +def shutdown_tracing() -> None: + """Shutdown the tracing provider and flush any pending spans.""" + global _tracer_provider + + if _tracer_provider: + _tracer_provider.shutdown() + logger.info("Tracing provider shutdown") + + +class TracingMiddleware: + """ + Middleware for FastAPI to handle trace context propagation. + + This middleware extracts trace context from incoming requests + and injects it into outgoing requests. + """ + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + # Extract trace context from headers + headers = dict(scope.get("headers", [])) + # Convert bytes to string for headers + headers = {k.decode(): v.decode() for k, v in headers.items()} + + # The FastAPI instrumentation will handle this automatically, + # but we keep this for custom use cases + context = extract_trace_context(headers) + + # Continue with the request + await self.app(scope, receive, send) + + +def get_current_span() -> Optional[trace.Span]: + """ + Get the current active span if any. + + Returns: + Current span or None + """ + return trace.get_current_span() + + +def get_trace_id() -> Optional[str]: + """ + Get the current trace ID as a hex string. + + Returns: + Trace ID or None + """ + span = get_current_span() + if span: + trace_id = span.get_span_context().trace_id + return format(trace_id, '032x') + return None + + +def get_span_id() -> Optional[str]: + """ + Get the current span ID as a hex string. + + Returns: + Span ID or None + """ + span = get_current_span() + if span: + span_id = span.get_span_context().span_id + return format(span_id, '016x') + return None diff --git a/ai-engine/utils/logging_config.py b/ai-engine/utils/logging_config.py index 713c5c0e..c6b09ef0 100644 --- a/ai-engine/utils/logging_config.py +++ b/ai-engine/utils/logging_config.py @@ -58,12 +58,15 @@ def configure_structlog( log_dir = os.getenv("LOG_DIR", "/tmp/modporter-ai/logs") # Configure processors based on format + # Order matters: context merging -> logger info -> level -> timestamper -> renderer -> exception handling processors = [ structlog.contextvars.merge_contextvars, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, ] if debug_mode: @@ -73,10 +76,6 @@ def configure_structlog( else: processors.append(structlog.dev.ConsoleRenderer(colors=False)) - # Add exception info processor - processors.append(structlog.processors.StackInfoRenderer()) - processors.append(structlog.processors.format_exc_info) - # Configure structlog structlog.configure( processors=processors, diff --git a/backend/requirements-dev.txt b/backend/requirements-dev.txt index 0d08d43f..fb588267 100644 --- a/backend/requirements-dev.txt +++ b/backend/requirements-dev.txt @@ -13,3 +13,7 @@ pre-commit>=3.0.0 # Logging structlog + +# Dependency checking +pip-audit>=2.7.0 +pipdeptree>=2.23.0 diff --git a/backend/requirements.txt b/backend/requirements.txt index 30d6bc8e..b260824b 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -33,6 +33,15 @@ prometheus_client>=0.17.0 sentry-sdk[fastapi]>=2.0.0 structlog>=24.0.0 +# Distributed Tracing - OpenTelemetry +opentelemetry-api>=1.24.0 +opentelemetry-sdk>=1.24.0 +opentelemetry-exporter-otlp>=1.24.0 +opentelemetry-exporter-jaeger>=1.24.0 +opentelemetry-instrumentation-fastapi>=0.45b0 +opentelemetry-instrumentation-httpx>=0.45b0 +opentelemetry-instrumentation-redis>=0.45b0 + # Testing pytest>=8.2 pytest-asyncio==1.3.0 diff --git a/backend/src/main.py b/backend/src/main.py index de2abfb6..70882cb7 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -48,6 +48,7 @@ ) from services.security_headers import SecurityHeadersMiddleware from services.logging_middleware import LoggingMiddleware, RequestContextMiddleware +from services.tracing import init_tracing, shutdown_tracing # Import API routers from api import ( @@ -101,8 +102,15 @@ async def lifespan(app: FastAPI): if testing_env != "true": await init_db() logger.info("Database initialized") + + # Initialize tracing + init_tracing(app=app, service_name="modporter-backend") + logger.info("Distributed tracing initialized") + yield + # Shutdown + shutdown_tracing() logger.info("Application shutdown") diff --git a/backend/src/services/ai_engine_client.py b/backend/src/services/ai_engine_client.py index e517e5aa..5eb077aa 100644 --- a/backend/src/services/ai_engine_client.py +++ b/backend/src/services/ai_engine_client.py @@ -3,6 +3,7 @@ Provides HTTP client for communicating with the AI Engine API. Handles file transfers, conversion requests, and progress polling. +Includes distributed tracing support. """ import asyncio @@ -13,6 +14,8 @@ import httpx +from services.tracing import inject_trace_context, get_trace_id + logger = logging.getLogger(__name__) # AI Engine configuration @@ -63,6 +66,21 @@ async def _get_client(self) -> httpx.AsyncClient: ) return self._client + def _get_trace_headers(self) -> Dict[str, str]: + """ + Get trace context headers for propagating to downstream services. + + Returns: + Dictionary of trace headers to include in requests + """ + headers = {} + inject_trace_context(headers) + # Also add trace_id for logging purposes + trace_id = get_trace_id() + if trace_id: + headers["X-Trace-ID"] = trace_id + return headers + async def close(self): """Close the HTTP client.""" if self._client and not self._client.is_closed: @@ -78,7 +96,8 @@ async def health_check(self) -> bool: """ try: client = await self._get_client() - response = await client.get("/api/v1/health") + headers = self._get_trace_headers() + response = await client.get("/api/v1/health", headers=headers) return response.status_code == 200 except Exception as e: logger.warning(f"AI Engine health check failed: {e}") @@ -108,6 +127,7 @@ async def start_conversion( """ try: client = await self._get_client() + headers = self._get_trace_headers() request_data = { "job_id": job_id, @@ -121,6 +141,7 @@ async def start_conversion( response = await client.post( "/api/v1/convert", json=request_data, + headers=headers, ) if response.status_code != 200: @@ -161,7 +182,8 @@ async def get_conversion_status(self, job_id: str) -> Dict[str, Any]: """ try: client = await self._get_client() - response = await client.get(f"/api/v1/status/{job_id}") + headers = self._get_trace_headers() + response = await client.get(f"/api/v1/status/{job_id}", headers=headers) if response.status_code == 404: raise AIEngineError("Job not found", status_code=404) diff --git a/backend/src/services/structured_logging.py b/backend/src/services/structured_logging.py index 3ea81f2b..22080f19 100644 --- a/backend/src/services/structured_logging.py +++ b/backend/src/services/structured_logging.py @@ -56,12 +56,15 @@ def configure_structlog( log_dir = os.getenv("LOG_DIR", "/var/log/modporter") # Configure processors based on format + # Order matters: context merging -> logger info -> level -> timestamper -> renderer -> exception handling processors = [ structlog.contextvars.merge_contextvars, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, ] if debug_mode: @@ -71,10 +74,6 @@ def configure_structlog( else: processors.append(structlog.dev.ConsoleRenderer(colors=False)) - # Add exception info processor - processors.append(structlog.processors.StackInfoRenderer()) - processors.append(structlog.processors.format_exc_info) - # Configure structlog structlog.configure( processors=processors, diff --git a/backend/src/services/tracing.py b/backend/src/services/tracing.py new file mode 100644 index 00000000..2f6b94ae --- /dev/null +++ b/backend/src/services/tracing.py @@ -0,0 +1,324 @@ +""" +Distributed Tracing Service using OpenTelemetry. + +This module provides tracing capabilities for the ModPorter AI application, +including trace context propagation between services. +""" + +import os +from typing import Optional +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.jaeger.thrift import JaegerExporter +from opentelemetry.sdk.extension.aws.resource.ec2 import AwsEc2ResourceDetector +from opentelemetry.sdk.extension.aws.resource.ecs import AwsEcsResourceDetector +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.trace import Status, StatusCode +from opentelemetry.context import Context +import logging + +logger = logging.getLogger(__name__) + +# Trace context propagator (W3C Trace Context) +tracer_propagator = TraceContextTextMapPropagator() + +# Global tracer instance +_tracer: Optional[trace.Tracer] = None +_tracer_provider: Optional[TracerProvider] = None + + +def get_tracer(service_name: str = "modporter-backend") -> trace.Tracer: + """ + Get or create a tracer instance for the given service. + + Args: + service_name: Name of the service for tracing + + Returns: + Configured tracer instance + """ + global _tracer, _tracer_provider + + if _tracer is not None: + return _tracer + + # Create resource with service information + service_version = os.getenv("SERVICE_VERSION", "1.0.0") + resource = Resource.create({ + SERVICE_NAME: service_name, + SERVICE_VERSION: service_version, + }) + + # Add cloud metadata if available + try: + ec2_resource = AwsEc2ResourceDetector().detect() + resource = resource.merge(ec2_resource) + except Exception: + pass + + try: + ecs_resource = AwsEcsResourceDetector().detect() + resource = resource.merge(ecs_resource) + except Exception: + pass + + # Create tracer provider + _tracer_provider = TracerProvider(resource=resource) + + # Configure exporter based on environment + tracing_enabled = os.getenv("TRACING_ENABLED", "true").lower() == "true" + tracing_exporter = os.getenv("TRACING_EXPORTER", "jaeger").lower() + + if tracing_enabled: + if tracing_exporter == "jaeger": + # Jaeger exporter configuration + jaeger_host = os.getenv("JAEGER_HOST", "localhost") + jaeger_port = int(os.getenv("JAEGER_PORT", "6831")) + + jaeger_exporter = JaegerExporter( + agent_host_name=jaeger_host, + agent_port=jaeger_port, + ) + _tracer_provider.add_span_processor( + BatchSpanProcessor(jaeger_exporter) + ) + logger.info(f"Jaeger tracing enabled: {jaeger_host}:{jaeger_port}") + + elif tracing_exporter == "otlp": + # OTLP exporter configuration + otlp_endpoint = os.getenv("OTLP_ENDPOINT", "http://localhost:4317") + + otlp_exporter = OTLPSpanExporter( + endpoint=otlp_endpoint, + insecure=True, + ) + _tracer_provider.add_span_processor( + BatchSpanProcessor(otlp_exporter) + ) + logger.info(f"OTLP tracing enabled: {otlp_endpoint}") + + # Add console exporter for development + if os.getenv("TRACING_CONSOLE", "false").lower() == "true": + _tracer_provider.add_span_processor( + BatchSpanProcessor(ConsoleSpanExporter()) + ) + logger.info("Console span exporter enabled") + + # Set the global tracer provider + trace.set_tracer_provider(_tracer_provider) + + # Create and return tracer + _tracer = trace.get_tracer(service_name) + + logger.info(f"Tracing initialized for service: {service_name}") + + return _tracer + + +def init_tracing( + app=None, + service_name: str = "modporter-backend", + instrument_fastapi: bool = True, + instrument_httpx: bool = True, + instrument_redis: bool = True, +) -> trace.Tracer: + """ + Initialize tracing with automatic instrumentation. + + Args: + app: FastAPI application instance (optional) + service_name: Name of the service + instrument_fastapi: Whether to instrument FastAPI + instrument_httpx: Whether to instrument HTTPX + instrument_redis: Whether to instrument Redis + + Returns: + Configured tracer instance + """ + tracer = get_tracer(service_name) + + # Instrument FastAPI if app provided + if app and instrument_fastapi: + try: + FastAPIInstrumentor.instrument_app(app) + logger.info("FastAPI instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument FastAPI: {e}") + + # Instrument HTTPX + if instrument_httpx: + try: + HTTPXClientInstrumentor().instrument() + logger.info("HTTPX instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument HTTPX: {e}") + + # Instrument Redis + if instrument_redis: + try: + RedisInstrumentor().instrument() + logger.info("Redis instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument Redis: {e}") + + return tracer + + +def extract_trace_context(carrier: dict) -> Context: + """ + Extract trace context from carrier (e.g., HTTP headers). + + Args: + carrier: Dictionary containing trace context (e.g., HTTP headers) + + Returns: + Extracted context + """ + return tracer_propagator.extract(carrier) + + +def inject_trace_context(carrier: dict) -> dict: + """ + Inject trace context into carrier (e.g., HTTP headers). + + Args: + carrier: Dictionary to inject trace context into + + Returns: + Carrier with injected trace context + """ + tracer_propagator.inject(carrier) + return carrier + + +def create_span( + name: str, + context: Optional[Context] = None, + kind: trace.SpanKind = trace.SpanKind.INTERNAL, +) -> trace.Span: + """ + Create a new span with the given name and context. + + Args: + name: Name of the span + context: Parent context (optional) + kind: Span kind + + Returns: + New span + """ + tracer = get_tracer() + + if context: + with tracer.start_as_current_span(name, context=context, kind=kind) as span: + return span + else: + with tracer.start_as_current_span(name, kind=kind) as span: + return span + + +def add_span_attributes(span: trace.Span, attributes: dict) -> None: + """ + Add attributes to a span. + + Args: + span: Span to add attributes to + attributes: Dictionary of attributes + """ + for key, value in attributes.items(): + if value is not None: + span.set_attribute(key, str(value)) + + +def record_span_exception(span: trace.Span, exception: Exception) -> None: + """ + Record an exception on a span. + + Args: + span: Span to record exception on + exception: Exception to record + """ + span.set_status(Status(StatusCode.ERROR, str(exception))) + span.record_exception(exception) + + +def shutdown_tracing() -> None: + """Shutdown the tracing provider and flush any pending spans.""" + global _tracer_provider + + if _tracer_provider: + _tracer_provider.shutdown() + logger.info("Tracing provider shutdown") + + +class TracingMiddleware: + """ + Middleware for FastAPI to handle trace context propagation. + + This middleware extracts trace context from incoming requests + and injects it into outgoing requests. + """ + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + # Extract trace context from headers + headers = dict(scope.get("headers", [])) + # Convert bytes to string for headers + headers = {k.decode(): v.decode() for k, v in headers.items()} + + # The FastAPI instrumentation will handle this automatically, + # but we keep this for custom use cases + context = extract_trace_context(headers) + + # Continue with the request + await self.app(scope, receive, send) + + +def get_current_span() -> Optional[trace.Span]: + """ + Get the current active span if any. + + Returns: + Current span or None + """ + return trace.get_current_span() + + +def get_trace_id() -> Optional[str]: + """ + Get the current trace ID as a hex string. + + Returns: + Trace ID or None + """ + span = get_current_span() + if span: + trace_id = span.get_span_context().trace_id + return format(trace_id, '032x') + return None + + +def get_span_id() -> Optional[str]: + """ + Get the current span ID as a hex string. + + Returns: + Span ID or None + """ + span = get_current_span() + if span: + span_id = span.get_span_context().span_id + return format(span_id, '016x') + return None diff --git a/docker-compose.yml b/docker-compose.yml index 5e3245ad..194bcae8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -26,11 +26,17 @@ services: - REDIS_URL=redis://redis:6379 - DATABASE_URL=postgresql+asyncpg://postgres:password@postgres:5432/modporter - LOG_LEVEL=INFO + - TRACING_ENABLED=true + - TRACING_EXPORTER=jaeger + - JAEGER_HOST=jaeger + - JAEGER_PORT=6831 depends_on: redis: condition: service_healthy postgres: condition: service_healthy + jaeger: + condition: service_started volumes: - ./backend/src:/app/src - conversion-cache:/app/cache @@ -59,9 +65,15 @@ services: - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - LOG_LEVEL=INFO - PORT=8001 + - TRACING_ENABLED=true + - TRACING_EXPORTER=jaeger + - JAEGER_HOST=jaeger + - JAEGER_PORT=6831 depends_on: redis: condition: service_healthy + jaeger: + condition: service_started volumes: - model-cache:/app/models - temp-uploads:/app/temp_uploads @@ -120,6 +132,34 @@ services: retries: 5 start_period: 30s + # Jaeger for distributed tracing visualization + jaeger: + image: jaegertracing/all-in-one:1.52 + ports: + - "6831:6831/udp" # Jaeger thrift compact protocol + - "16686:16686" # Jaeger UI + - "4317:4317" # OTLP gRPC receiver + - "4318:4318" # OTLP HTTP receiver + environment: + - COLLECTOR_OTLP_ENABLED=true + - SPAN_STORAGE_TYPE=badger + volumes: + - jaeger-data:/var/lib/jaeger + networks: + - modporter-network + restart: unless-stopped + healthcheck: + test: ["CMD", "wget", "--spider", "-q", "http://localhost:16686"] + interval: 30s + timeout: 10s + retries: 3 + deploy: + resources: + limits: + memory: 512M + reservations: + memory: 256M + volumes: redis-data: driver: local @@ -133,6 +173,8 @@ volumes: driver: local conversion-outputs: driver: local + jaeger-data: + driver: local networks: modporter-network: From dcfb16a87333bb6067eb09b6c3edfa65577d8948 Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 8 Mar 2026 19:41:21 -0400 Subject: [PATCH 4/7] style: Fix Black formatting issues - Format backend files: tracing.py, ai_engine_client.py, structured_logging.py, main.py - Format ai-engine files: tracing.py, main.py, logging_config.py Co-authored-by: openhands --- ai-engine/main.py | 260 +++++++---- ai-engine/tracing.py | 110 +++-- ai-engine/utils/logging_config.py | 491 +++++++++++---------- backend/src/main.py | 220 ++++++--- backend/src/services/ai_engine_client.py | 12 +- backend/src/services/structured_logging.py | 14 +- backend/src/services/tracing.py | 110 +++-- 7 files changed, 724 insertions(+), 493 deletions(-) diff --git a/ai-engine/main.py b/ai-engine/main.py index b9b75a14..2f87ab54 100644 --- a/ai-engine/main.py +++ b/ai-engine/main.py @@ -30,12 +30,12 @@ # Also configure structlog for structured JSON logging in production configure_structlog( debug_mode=debug_mode, - json_format=os.getenv("LOG_JSON_FORMAT", "false").lower() == "true" + json_format=os.getenv("LOG_JSON_FORMAT", "false").lower() == "true", ) setup_logging( debug_mode=debug_mode, - enable_file_logging=os.getenv("ENABLE_FILE_LOGGING", "true").lower() == "true" + enable_file_logging=os.getenv("ENABLE_FILE_LOGGING", "true").lower() == "true", ) logger = get_agent_logger("main") @@ -46,7 +46,11 @@ # Import progress callback for real-time updates try: - from utils.progress_callback import create_progress_callback, cleanup_progress_callback + from utils.progress_callback import ( + create_progress_callback, + cleanup_progress_callback, + ) + PROGRESS_CALLBACK_AVAILABLE = True except ImportError: PROGRESS_CALLBACK_AVAILABLE = False @@ -62,14 +66,16 @@ if debug_mode: print_gpu_info() + # Status enumeration for conversion states class ConversionStatusEnum(str, Enum): QUEUED = "queued" - IN_PROGRESS = "in_progress" + IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" + # FastAPI app configuration app = FastAPI( title="ModPorter AI Engine", @@ -87,7 +93,9 @@ class ConversionStatusEnum(str, Enum): ) # CORS middleware - Restrict origins for security -allowed_origins = os.getenv("ALLOWED_ORIGINS", "http://localhost:3000,http://localhost:8080").split(",") +allowed_origins = os.getenv( + "ALLOWED_ORIGINS", "http://localhost:3000,http://localhost:8080" +).split(",") app.add_middleware( CORSMiddleware, allow_origins=allowed_origins, @@ -101,55 +109,72 @@ class ConversionStatusEnum(str, Enum): assumption_engine = None redis_client = None + # Redis job state management class RedisJobManager: def __init__(self, redis_client): self.redis = redis_client self.available = True - + async def set_job_status(self, job_id: str, status: "ConversionStatus") -> None: """Store job status in Redis with error handling""" try: if not self.available: - raise HTTPException(status_code=503, detail="Job state storage unavailable") - + raise HTTPException( + status_code=503, detail="Job state storage unavailable" + ) + status_dict = status.model_dump() - status_dict['started_at'] = status_dict['started_at'].isoformat() if status_dict['started_at'] else None - status_dict['completed_at'] = status_dict['completed_at'].isoformat() if status_dict['completed_at'] else None - + status_dict["started_at"] = ( + status_dict["started_at"].isoformat() + if status_dict["started_at"] + else None + ) + status_dict["completed_at"] = ( + status_dict["completed_at"].isoformat() + if status_dict["completed_at"] + else None + ) + await self.redis.set( - f"ai_engine:jobs:{job_id}", + f"ai_engine:jobs:{job_id}", json.dumps(status_dict), - ex=3600 # Expire after 1 hour + ex=3600, # Expire after 1 hour ) except Exception as e: logger.error(f"Failed to store job status in Redis: {e}", exc_info=True) self.available = False raise HTTPException(status_code=503, detail="Job state storage failed") - + async def get_job_status(self, job_id: str) -> Optional["ConversionStatus"]: """Retrieve job status from Redis with error handling""" try: if not self.available: return None - + data = await self.redis.get(f"ai_engine:jobs:{job_id}") if not data: return None - + status_dict = json.loads(data) # Convert ISO strings back to datetime - if status_dict.get('started_at'): - status_dict['started_at'] = datetime.fromisoformat(status_dict['started_at']) - if status_dict.get('completed_at'): - status_dict['completed_at'] = datetime.fromisoformat(status_dict['completed_at']) - + if status_dict.get("started_at"): + status_dict["started_at"] = datetime.fromisoformat( + status_dict["started_at"] + ) + if status_dict.get("completed_at"): + status_dict["completed_at"] = datetime.fromisoformat( + status_dict["completed_at"] + ) + return ConversionStatus(**status_dict) except Exception as e: - logger.error(f"Failed to retrieve job status from Redis: {e}", exc_info=True) + logger.error( + f"Failed to retrieve job status from Redis: {e}", exc_info=True + ) self.available = False return None - + async def delete_job(self, job_id: str) -> None: """Remove job from Redis""" try: @@ -158,32 +183,45 @@ async def delete_job(self, job_id: str) -> None: except Exception as e: logger.error(f"Failed to delete job from Redis: {e}", exc_info=True) + job_manager = None + # Pydantic models class HealthResponse(BaseModel): """Health check response model""" + status: str version: str timestamp: str services: Dict[str, str] + class ConversionRequest(BaseModel): """Conversion request model""" + job_id: str = Field(..., description="Unique job identifier") mod_file_path: str = Field(..., description="Path to the mod file") - conversion_options: Optional[Dict[str, Any]] = Field(default={}, description="Conversion options") - experiment_variant: Optional[str] = Field(default=None, description="Experiment variant ID for A/B testing") + conversion_options: Optional[Dict[str, Any]] = Field( + default={}, description="Conversion options" + ) + experiment_variant: Optional[str] = Field( + default=None, description="Experiment variant ID for A/B testing" + ) + class ConversionResponse(BaseModel): """Conversion response model""" + job_id: str status: str message: str estimated_time: Optional[int] = None + class ConversionStatus(BaseModel): """Conversion status model""" + job_id: str status: str progress: int @@ -192,90 +230,94 @@ class ConversionStatus(BaseModel): started_at: Optional[datetime] = None completed_at: Optional[datetime] = None + # Job storage is now handled by RedisJobManager - no global dict + @app.on_event("startup") async def startup_event(): """Initialize services on startup""" global conversion_crew, assumption_engine, redis_client, job_manager - + logger.info("Starting ModPorter AI Engine...") - + # Initialize tracing init_tracing(app=app, service_name="modporter-ai-engine") logger.info("Distributed tracing initialized") - + try: # Initialize Redis connection redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") redis_client = aioredis.from_url(redis_url, decode_responses=True) - + # Test Redis connection await redis_client.ping() logger.info("Redis connection established") - + # Initialize job manager job_manager = RedisJobManager(redis_client) logger.info("RedisJobManager initialized") - + # Initialize SmartAssumptionEngine assumption_engine = SmartAssumptionEngine() logger.info("SmartAssumptionEngine initialized") - + # Note: We now initialize the conversion crew per request to support variants # The global conversion_crew will remain None - + logger.info("ModPorter AI Engine startup complete") - + except Exception as e: logger.error(f"Failed to initialize AI Engine: {e}", exc_info=True) raise HTTPException(status_code=503, detail="Service initialization failed") + @app.on_event("shutdown") async def shutdown_event(): """Cleanup on shutdown""" global redis_client - + # Shutdown tracing shutdown_tracing() logger.info("Distributed tracing shutdown") - + # Close Redis connection if redis_client: await redis_client.close() logger.info("Redis connection closed") - + logger.info("ModPorter AI Engine shutdown complete") + @app.get("/api/v1/health", response_model=HealthResponse, tags=["health"]) async def health_check(): """Check the health status of the AI Engine""" services = { "assumption_engine": "healthy" if assumption_engine else "unavailable", } - + # Conversion crew is now initialized per request, so we don't check it here - + return HealthResponse( status="healthy", version="1.0.0", timestamp=datetime.utcnow().isoformat(), - services=services + services=services, ) + @app.post("/api/v1/convert", response_model=ConversionResponse, tags=["conversion"]) async def start_conversion( - request: ConversionRequest, - background_tasks: BackgroundTasks + request: ConversionRequest, background_tasks: BackgroundTasks ): """Start a new mod conversion job""" - + if not job_manager or not job_manager.available: raise HTTPException(status_code=503, detail="Job state storage unavailable") - + # Initialize conversion crew with variant if specified # The crew is now initialized in the background task to avoid blocking the request - + # Create job status job_status = ConversionStatus( job_id=request.job_id, @@ -283,75 +325,89 @@ async def start_conversion( progress=0, current_stage="initialization", message="Conversion job queued", - started_at=datetime.utcnow() + started_at=datetime.utcnow(), ) - + # Store in Redis instead of global dict await job_manager.set_job_status(request.job_id, job_status) - + # Start conversion in background background_tasks.add_task( process_conversion, request.job_id, request.mod_file_path, request.conversion_options, - request.experiment_variant # Pass variant to process_conversion + request.experiment_variant, # Pass variant to process_conversion ) - + logger.info(f"Started conversion job {request.job_id}") - + return ConversionResponse( job_id=request.job_id, status="queued", message="Conversion job started", - estimated_time=120 # Placeholder - would be calculated based on mod size + estimated_time=120, # Placeholder - would be calculated based on mod size ) -@app.get("/api/v1/status/{job_id}", response_model=ConversionStatus, tags=["conversion"]) + +@app.get( + "/api/v1/status/{job_id}", response_model=ConversionStatus, tags=["conversion"] +) async def get_conversion_status(job_id: str): """Get the status of a conversion job""" - + if not job_manager: raise HTTPException(status_code=503, detail="Job state storage unavailable") - + job_status = await job_manager.get_job_status(job_id) if not job_status: raise HTTPException(status_code=404, detail="Job not found") - + return job_status + @app.get("/api/v1/jobs", response_model=List[ConversionStatus], tags=["conversion"]) async def list_jobs(): """List all active conversion jobs""" if not job_manager or not job_manager.available: raise HTTPException(status_code=503, detail="Job state storage unavailable") - + # Note: In production, implement pagination and filtering # For now, return empty list as Redis doesn't have easy "list all" without keys - logger.warning("list_jobs endpoint returns empty - implement Redis SCAN for production") + logger.warning( + "list_jobs endpoint returns empty - implement Redis SCAN for production" + ) return [] -async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, Any], experiment_variant: Optional[str] = None): + +async def process_conversion( + job_id: str, + mod_file_path: str, + options: Dict[str, Any], + experiment_variant: Optional[str] = None, +): """Process a conversion job using the AI crew""" - + progress_callback = None - + try: # Get current job status job_status = await job_manager.get_job_status(job_id) if not job_status: logger.error(f"Job {job_id} not found during processing") return - + # Update job status job_status.status = "processing" job_status.current_stage = "analysis" job_status.message = "Analyzing mod structure" job_status.progress = 10 await job_manager.set_job_status(job_id, job_status) - - logger.info(f"Processing conversion for job {job_id} with variant {experiment_variant}") - + + logger.info( + f"Processing conversion for job {job_id} with variant {experiment_variant}" + ) + # Create progress callback for real-time updates if available if PROGRESS_CALLBACK_AVAILABLE and create_progress_callback: try: @@ -359,25 +415,29 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, logger.info(f"Progress callback created for job {job_id}") except Exception as e: logger.warning(f"Failed to create progress callback: {e}") - + # Prepare output path output_path = options.get("output_path") if not output_path: # Default output path using job_id pattern that backend expects # Use the mounted volume path inside the container - output_path = os.path.join(os.getenv("CONVERSION_OUTPUT_DIR", "/app/conversion_outputs"), f"{job_id}_converted.mcaddon") - + output_path = os.path.join( + os.getenv("CONVERSION_OUTPUT_DIR", "/app/conversion_outputs"), + f"{job_id}_converted.mcaddon", + ) + # Ensure the output directory exists os.makedirs(os.path.dirname(output_path), exist_ok=True) - + try: # Initialize conversion crew with variant if specified and pass progress callback crew = ModPorterConversionCrew( - variant_id=experiment_variant, - progress_callback=progress_callback + variant_id=experiment_variant, progress_callback=progress_callback + ) + logger.info( + f"ModPorterConversionCrew initialized with variant: {experiment_variant}" ) - logger.info(f"ModPorterConversionCrew initialized with variant: {experiment_variant}") - + # Update status for analysis stage job_status = await job_manager.get_job_status(job_id) if job_status: @@ -385,16 +445,17 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, job_status.message = "Analyzing Java mod structure" job_status.progress = 20 await job_manager.set_job_status(job_id, job_status) - + # Execute the actual AI conversion using the conversion crew from pathlib import Path + conversion_result = crew.convert_mod( mod_path=Path(mod_file_path), output_path=Path(output_path), smart_assumptions=options.get("smart_assumptions", True), - include_dependencies=options.get("include_dependencies", True) + include_dependencies=options.get("include_dependencies", True), ) - + # Update progress based on conversion result if conversion_result.get("status") == "failed": # Mark job as failed @@ -403,9 +464,11 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, job_status.status = "failed" job_status.message = f"Conversion failed: {conversion_result.get('error', 'Unknown error')}" await job_manager.set_job_status(job_id, job_status) - logger.error(f"Conversion failed for job {job_id}: {conversion_result.get('error')}") + logger.error( + f"Conversion failed for job {job_id}: {conversion_result.get('error')}" + ) return - + # Update progress through conversion stages stages = [ ("planning", "Creating conversion plan", 40), @@ -414,7 +477,7 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, ("packaging", "Packaging Bedrock addon", 90), ("validation", "Validating conversion", 95), ] - + for stage, message, progress in stages: job_status = await job_manager.get_job_status(job_id) if job_status: @@ -422,16 +485,21 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, job_status.message = message job_status.progress = progress await job_manager.set_job_status(job_id, job_status) - + # Short delay to show progress import asyncio + await asyncio.sleep(0.5) - + # Verify output file was created if not os.path.exists(output_path): - logger.error(f"Output file not created by conversion crew: {output_path}") - logger.error("This indicates a serious conversion failure that should not be masked") - + logger.error( + f"Output file not created by conversion crew: {output_path}" + ) + logger.error( + "This indicates a serious conversion failure that should not be masked" + ) + # Mark job as failed explicitly instead of creating a fake successful output job_status = await job_manager.get_job_status(job_id) if job_status: @@ -439,9 +507,9 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, job_status.message = "Conversion crew failed to produce output file - this indicates a serious error in the conversion process" await job_manager.set_job_status(job_id, job_status) return - + logger.info(f"Conversion completed successfully: {output_path}") - + except Exception as conversion_error: logger.error(f"Failed to convert mod {mod_file_path}: {conversion_error}") # Mark job as failed if conversion fails @@ -459,12 +527,12 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, job_status.message = "Conversion completed successfully" job_status.completed_at = datetime.utcnow() await job_manager.set_job_status(job_id, job_status) - + logger.info(f"Completed conversion for job {job_id}") - + except Exception as e: logger.error(f"Conversion failed for job {job_id}: {e}", exc_info=True) - + # Update job status to failed job_status = await job_manager.get_job_status(job_id) if job_status: @@ -473,20 +541,28 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, try: await job_manager.set_job_status(job_id, job_status) except Exception as status_error: - logger.error(f"Failed to update job status after error: {status_error}", exc_info=True) + logger.error( + f"Failed to update job status after error: {status_error}", + exc_info=True, + ) finally: # Clean up progress callback - if progress_callback and PROGRESS_CALLBACK_AVAILABLE and cleanup_progress_callback: + if ( + progress_callback + and PROGRESS_CALLBACK_AVAILABLE + and cleanup_progress_callback + ): try: await cleanup_progress_callback(job_id) logger.info(f"Cleaned up progress callback for job {job_id}") except Exception as e: logger.warning(f"Failed to cleanup progress callback: {e}") + if __name__ == "__main__": uvicorn.run( "main:app", host=os.getenv("HOST", "0.0.0.0"), port=int(os.getenv("PORT", 8001)), - reload=os.getenv("DEBUG", "false").lower() == "true" + reload=os.getenv("DEBUG", "false").lower() == "true", ) diff --git a/ai-engine/tracing.py b/ai-engine/tracing.py index d3d80762..d56a9009 100644 --- a/ai-engine/tracing.py +++ b/ai-engine/tracing.py @@ -36,88 +36,84 @@ def get_tracer(service_name: str = "modporter-ai-engine") -> trace.Tracer: """ Get or create a tracer instance for the given service. - + Args: service_name: Name of the service for tracing - + Returns: Configured tracer instance """ global _tracer, _tracer_provider - + if _tracer is not None: return _tracer - + # Create resource with service information service_version = os.getenv("SERVICE_VERSION", "1.0.0") - resource = Resource.create({ - SERVICE_NAME: service_name, - SERVICE_VERSION: service_version, - }) - + resource = Resource.create( + { + SERVICE_NAME: service_name, + SERVICE_VERSION: service_version, + } + ) + # Add cloud metadata if available try: ec2_resource = AwsEc2ResourceDetector().detect() resource = resource.merge(ec2_resource) except Exception: pass - + try: ecs_resource = AwsEcsResourceDetector().detect() resource = resource.merge(ecs_resource) except Exception: pass - + # Create tracer provider _tracer_provider = TracerProvider(resource=resource) - + # Configure exporter based on environment tracing_enabled = os.getenv("TRACING_ENABLED", "true").lower() == "true" tracing_exporter = os.getenv("TRACING_EXPORTER", "jaeger").lower() - + if tracing_enabled: if tracing_exporter == "jaeger": # Jaeger exporter configuration jaeger_host = os.getenv("JAEGER_HOST", "localhost") jaeger_port = int(os.getenv("JAEGER_PORT", "6831")) - + jaeger_exporter = JaegerExporter( agent_host_name=jaeger_host, agent_port=jaeger_port, ) - _tracer_provider.add_span_processor( - BatchSpanProcessor(jaeger_exporter) - ) + _tracer_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter)) logger.info(f"Jaeger tracing enabled: {jaeger_host}:{jaeger_port}") - + elif tracing_exporter == "otlp": # OTLP exporter configuration otlp_endpoint = os.getenv("OTLP_ENDPOINT", "http://localhost:4317") - + otlp_exporter = OTLPSpanExporter( endpoint=otlp_endpoint, insecure=True, ) - _tracer_provider.add_span_processor( - BatchSpanProcessor(otlp_exporter) - ) + _tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter)) logger.info(f"OTLP tracing enabled: {otlp_endpoint}") - + # Add console exporter for development if os.getenv("TRACING_CONSOLE", "false").lower() == "true": - _tracer_provider.add_span_processor( - BatchSpanProcessor(ConsoleSpanExporter()) - ) + _tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) logger.info("Console span exporter enabled") - + # Set the global tracer provider trace.set_tracer_provider(_tracer_provider) - + # Create and return tracer _tracer = trace.get_tracer(service_name) - + logger.info(f"Tracing initialized for service: {service_name}") - + return _tracer @@ -130,19 +126,19 @@ def init_tracing( ) -> trace.Tracer: """ Initialize tracing with automatic instrumentation. - + Args: app: FastAPI application instance (optional) service_name: Name of the service instrument_fastapi: Whether to instrument FastAPI instrument_httpx: Whether to instrument HTTPX instrument_redis: Whether to instrument Redis - + Returns: Configured tracer instance """ tracer = get_tracer(service_name) - + # Instrument FastAPI if app provided if app and instrument_fastapi: try: @@ -150,7 +146,7 @@ def init_tracing( logger.info("FastAPI instrumentation enabled") except Exception as e: logger.warning(f"Failed to instrument FastAPI: {e}") - + # Instrument HTTPX if instrument_httpx: try: @@ -158,7 +154,7 @@ def init_tracing( logger.info("HTTPX instrumentation enabled") except Exception as e: logger.warning(f"Failed to instrument HTTPX: {e}") - + # Instrument Redis if instrument_redis: try: @@ -166,17 +162,17 @@ def init_tracing( logger.info("Redis instrumentation enabled") except Exception as e: logger.warning(f"Failed to instrument Redis: {e}") - + return tracer def extract_trace_context(carrier: dict) -> Context: """ Extract trace context from carrier (e.g., HTTP headers). - + Args: carrier: Dictionary containing trace context (e.g., HTTP headers) - + Returns: Extracted context """ @@ -186,10 +182,10 @@ def extract_trace_context(carrier: dict) -> Context: def inject_trace_context(carrier: dict) -> dict: """ Inject trace context into carrier (e.g., HTTP headers). - + Args: carrier: Dictionary to inject trace context into - + Returns: Carrier with injected trace context """ @@ -204,17 +200,17 @@ def create_span( ) -> trace.Span: """ Create a new span with the given name and context. - + Args: name: Name of the span context: Parent context (optional) kind: Span kind - + Returns: New span """ tracer = get_tracer() - + if context: with tracer.start_as_current_span(name, context=context, kind=kind) as span: return span @@ -226,7 +222,7 @@ def create_span( def add_span_attributes(span: trace.Span, attributes: dict) -> None: """ Add attributes to a span. - + Args: span: Span to add attributes to attributes: Dictionary of attributes @@ -239,7 +235,7 @@ def add_span_attributes(span: trace.Span, attributes: dict) -> None: def record_span_exception(span: trace.Span, exception: Exception) -> None: """ Record an exception on a span. - + Args: span: Span to record exception on exception: Exception to record @@ -251,7 +247,7 @@ def record_span_exception(span: trace.Span, exception: Exception) -> None: def shutdown_tracing() -> None: """Shutdown the tracing provider and flush any pending spans.""" global _tracer_provider - + if _tracer_provider: _tracer_provider.shutdown() logger.info("Tracing provider shutdown") @@ -260,28 +256,28 @@ def shutdown_tracing() -> None: class TracingMiddleware: """ Middleware for FastAPI to handle trace context propagation. - + This middleware extracts trace context from incoming requests and injects it into outgoing requests. """ - + def __init__(self, app): self.app = app - + async def __call__(self, scope, receive, send): if scope["type"] != "http": await self.app(scope, receive, send) return - + # Extract trace context from headers headers = dict(scope.get("headers", [])) # Convert bytes to string for headers headers = {k.decode(): v.decode() for k, v in headers.items()} - + # The FastAPI instrumentation will handle this automatically, # but we keep this for custom use cases context = extract_trace_context(headers) - + # Continue with the request await self.app(scope, receive, send) @@ -289,7 +285,7 @@ async def __call__(self, scope, receive, send): def get_current_span() -> Optional[trace.Span]: """ Get the current active span if any. - + Returns: Current span or None """ @@ -299,26 +295,26 @@ def get_current_span() -> Optional[trace.Span]: def get_trace_id() -> Optional[str]: """ Get the current trace ID as a hex string. - + Returns: Trace ID or None """ span = get_current_span() if span: trace_id = span.get_span_context().trace_id - return format(trace_id, '032x') + return format(trace_id, "032x") return None def get_span_id() -> Optional[str]: """ Get the current span ID as a hex string. - + Returns: Span ID or None """ span = get_current_span() if span: span_id = span.get_span_context().span_id - return format(span_id, '016x') + return format(span_id, "016x") return None diff --git a/ai-engine/utils/logging_config.py b/ai-engine/utils/logging_config.py index c6b09ef0..dec5f409 100644 --- a/ai-engine/utils/logging_config.py +++ b/ai-engine/utils/logging_config.py @@ -27,7 +27,9 @@ import json # Context variable for correlation ID -correlation_id_var: ContextVar[Optional[str]] = ContextVar("correlation_id", default=None) +correlation_id_var: ContextVar[Optional[str]] = ContextVar( + "correlation_id", default=None +) def configure_structlog( @@ -38,7 +40,7 @@ def configure_structlog( ): """ Configure structlog for the AI engine. - + Args: log_level: Logging level (DEBUG, INFO, WARNING, ERROR) log_file: Path to log file (optional) @@ -47,16 +49,16 @@ def configure_structlog( """ if log_level is None: log_level = os.getenv("LOG_LEVEL", "INFO").upper() - + # Auto-detect JSON format in production if json_format is None: json_format = os.getenv("LOG_JSON_FORMAT", "false").lower() == "true" if os.getenv("ENVIRONMENT", "development") == "production": json_format = True - + # Get log directory log_dir = os.getenv("LOG_DIR", "/tmp/modporter-ai/logs") - + # Configure processors based on format # Order matters: context merging -> logger info -> level -> timestamper -> renderer -> exception handling processors = [ @@ -68,14 +70,14 @@ def configure_structlog( structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, ] - + if debug_mode: processors.append(structlog.dev.ConsoleRenderer()) elif json_format: processors.append(structlog.processors.JSONRenderer()) else: processors.append(structlog.dev.ConsoleRenderer(colors=False)) - + # Configure structlog structlog.configure( processors=processors, @@ -84,160 +86,174 @@ def configure_structlog( logger_factory=structlog.stdlib.LoggerFactory(), cache_logger_on_first_use=True, ) - + # Also configure standard library logging root_logger = logging.getLogger() root_logger.setLevel(getattr(logging, log_level, logging.INFO)) - + # Clear existing handlers root_logger.handlers.clear() - + # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(getattr(logging, log_level, logging.INFO)) - console_handler.setFormatter(logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S" - )) + console_handler.setFormatter( + logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + ) root_logger.addHandler(console_handler) - + # File handler for production if log_file is None: os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, "ai-engine.log") - + file_handler = logging.handlers.RotatingFileHandler( - log_file, - maxBytes=10 * 1024 * 1024, # 10MB - backupCount=5, - encoding='utf-8' + log_file, maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8" # 10MB ) file_handler.setLevel(logging.INFO) - file_handler.setFormatter(logging.Formatter( - "%(message)s" - )) + file_handler.setFormatter(logging.Formatter("%(message)s")) root_logger.addHandler(file_handler) - + return structlog.get_logger() class AgentLogFormatter(logging.Formatter): """Custom formatter for agent logging with structured output""" - + def __init__(self, include_agent_context: bool = True): self.include_agent_context = include_agent_context super().__init__() - + def format(self, record: logging.LogRecord) -> str: # Base format with timestamp, level, and logger name - timestamp = datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] - + timestamp = datetime.fromtimestamp(record.created).strftime( + "%Y-%m-%d %H:%M:%S.%f" + )[:-3] + # Extract agent name from logger name (e.g., 'agents.java_analyzer' -> 'JavaAnalyzer') - logger_parts = record.name.split('.') - if 'agents' in logger_parts: - agent_idx = logger_parts.index('agents') + logger_parts = record.name.split(".") + if "agents" in logger_parts: + agent_idx = logger_parts.index("agents") if agent_idx + 1 < len(logger_parts): - agent_name = logger_parts[agent_idx + 1].replace('_', '').title() + agent_name = logger_parts[agent_idx + 1].replace("_", "").title() else: - agent_name = 'Agent' - elif 'crew' in logger_parts: - agent_name = 'Crew' + agent_name = "Agent" + elif "crew" in logger_parts: + agent_name = "Crew" else: - agent_name = record.name.split('.')[-1].title() - + agent_name = record.name.split(".")[-1].title() + # Build the log message - base_msg = f"{timestamp} [{record.levelname}] {agent_name}: {record.getMessage()}" - + base_msg = ( + f"{timestamp} [{record.levelname}] {agent_name}: {record.getMessage()}" + ) + # Add extra context if available - if hasattr(record, 'agent_context') and self.include_agent_context: + if hasattr(record, "agent_context") and self.include_agent_context: context = record.agent_context if isinstance(context, dict): - context_str = json.dumps(context, separators=(',', ':')) + context_str = json.dumps(context, separators=(",", ":")) base_msg += f" | Context: {context_str}" - + # Add operation timing if available - if hasattr(record, 'operation_time'): + if hasattr(record, "operation_time"): base_msg += f" | Duration: {record.operation_time:.3f}s" - + # Add tool usage information if available - if hasattr(record, 'tool_name'): + if hasattr(record, "tool_name"): base_msg += f" | Tool: {record.tool_name}" - if hasattr(record, 'tool_result'): + if hasattr(record, "tool_result"): result_preview = str(record.tool_result)[:100] if len(str(record.tool_result)) > 100: result_preview += "..." base_msg += f" | Result: {result_preview}" - + return base_msg class AgentLogger: """Enhanced logger for agent operations with structured logging""" - + def __init__(self, name: str): self.logger = logging.getLogger(name) - self.agent_name = name.split('.')[-1].replace('_', '').title() - + self.agent_name = name.split(".")[-1].replace("_", "").title() + def info(self, message: str, **kwargs): """Log info message with optional context""" extra = self._build_extra(**kwargs) self.logger.info(message, extra=extra) - + def debug(self, message: str, **kwargs): """Log debug message with optional context""" extra = self._build_extra(**kwargs) self.logger.debug(message, extra=extra) - + def warning(self, message: str, **kwargs): """Log warning message with optional context""" extra = self._build_extra(**kwargs) self.logger.warning(message, extra=extra) - + def error(self, message: str, **kwargs): """Log error message with optional context""" extra = self._build_extra(**kwargs) self.logger.error(message, extra=extra) - + def log_operation_start(self, operation: str, **context): """Log the start of an operation""" self.info(f"Starting {operation}", agent_context=context) - + def log_operation_complete(self, operation: str, duration: float, **context): """Log the completion of an operation with timing""" - self.info(f"Completed {operation}", operation_time=duration, agent_context=context) - - def log_tool_usage(self, tool_name: str, result: Any = None, duration: Optional[float] = None): + self.info( + f"Completed {operation}", operation_time=duration, agent_context=context + ) + + def log_tool_usage( + self, tool_name: str, result: Any = None, duration: Optional[float] = None + ): """Log tool usage with results""" - kwargs = {'tool_name': tool_name} + kwargs = {"tool_name": tool_name} if result is not None: - kwargs['tool_result'] = result + kwargs["tool_result"] = result if duration is not None: - kwargs['operation_time'] = duration - + kwargs["operation_time"] = duration + self.info(f"Used tool: {tool_name}", **kwargs) - + def log_agent_decision(self, decision: str, reasoning: str, **context): """Log agent decision-making process""" log_context = context.copy() - log_context['decision'] = decision - log_context['reasoning'] = reasoning + log_context["decision"] = decision + log_context["reasoning"] = reasoning self.info(f"Decision: {decision}", agent_context=log_context) - - def log_data_transfer(self, from_agent: str, to_agent: str, data_type: str, data_size: Optional[int] = None): + + def log_data_transfer( + self, + from_agent: str, + to_agent: str, + data_type: str, + data_size: Optional[int] = None, + ): """Log data transfer between agents""" context = { - 'from_agent': from_agent, - 'to_agent': to_agent, - 'data_type': data_type + "from_agent": from_agent, + "to_agent": to_agent, + "data_type": data_type, } if data_size is not None: - context['data_size'] = data_size - - self.info(f"Data transfer: {data_type} from {from_agent} to {to_agent}", agent_context=context) - + context["data_size"] = data_size + + self.info( + f"Data transfer: {data_type} from {from_agent} to {to_agent}", + agent_context=context, + ) + def _build_extra(self, **kwargs) -> Dict[str, Any]: """Build extra dictionary for logging""" - valid_keys = {'agent_context', 'operation_time', 'tool_name', 'tool_result'} + valid_keys = {"agent_context", "operation_time", "tool_name", "tool_result"} extra = {key: value for key, value in kwargs.items() if key in valid_keys} return extra @@ -248,11 +264,11 @@ def setup_logging( enable_file_logging: bool = True, debug_mode: bool = False, max_file_size: int = 10 * 1024 * 1024, # 10MB - backup_count: int = 5 + backup_count: int = 5, ) -> None: """ Setup centralized logging configuration for the AI engine - + Args: log_level: Logging level (DEBUG, INFO, WARNING, ERROR) log_file: Path to log file (optional) @@ -261,30 +277,30 @@ def setup_logging( max_file_size: Maximum size of log file before rotation backup_count: Number of backup log files to keep """ - + # Determine log level if log_level is None: log_level = os.getenv("LOG_LEVEL", "DEBUG" if debug_mode else "INFO").upper() - + # Convert string level to logging constant numeric_level = getattr(logging, log_level, logging.INFO) - + # Create custom formatter formatter = AgentLogFormatter(include_agent_context=debug_mode) - + # Configure root logger root_logger = logging.getLogger() root_logger.setLevel(numeric_level) - + # Clear existing handlers root_logger.handlers.clear() - + # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(numeric_level) console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) - + # File handler (if enabled) if enable_file_logging: if log_file is None: @@ -292,36 +308,37 @@ def setup_logging( log_dir = Path(os.getenv("LOG_DIR", "/tmp/modporter-ai/logs")) log_dir.mkdir(parents=True, exist_ok=True) log_file = log_dir / "ai-engine.log" - + # Use rotating file handler to prevent huge log files file_handler = logging.handlers.RotatingFileHandler( - log_file, - maxBytes=max_file_size, - backupCount=backup_count, - encoding='utf-8' + log_file, maxBytes=max_file_size, backupCount=backup_count, encoding="utf-8" ) file_handler.setLevel(numeric_level) file_handler.setFormatter(formatter) root_logger.addHandler(file_handler) - + # Set specific logger levels for third-party libraries to reduce noise logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) - logging.getLogger("crewai").setLevel(logging.INFO if debug_mode else logging.WARNING) - + logging.getLogger("crewai").setLevel( + logging.INFO if debug_mode else logging.WARNING + ) + # Log the configuration logger = logging.getLogger(__name__) - logger.info(f"Logging configured - Level: {log_level}, Debug: {debug_mode}, File: {enable_file_logging}") + logger.info( + f"Logging configured - Level: {log_level}, Debug: {debug_mode}, File: {enable_file_logging}" + ) def get_agent_logger(agent_name: str) -> AgentLogger: """ Get a configured logger for an agent - + Args: agent_name: Name of the agent (e.g., 'java_analyzer', 'logic_translator') - + Returns: AgentLogger instance """ @@ -332,10 +349,10 @@ def get_agent_logger(agent_name: str) -> AgentLogger: def get_structlog_logger(name: str = None) -> structlog.BoundLogger: """ Get a structlog logger instance. - + Args: name: Logger name (optional) - + Returns: Configured structlog logger """ @@ -347,16 +364,16 @@ def get_structlog_logger(name: str = None) -> structlog.BoundLogger: def set_correlation_id(correlation_id: Optional[str] = None) -> str: """ Set the correlation ID for the current context. - + Args: correlation_id: Optional correlation ID to use - + Returns: The correlation ID (either provided or generated) """ if correlation_id is None: correlation_id = str(uuid.uuid4()) - + correlation_id_var.set(correlation_id) structlog.contextvars.clear_contextvars() structlog.contextvars.bind_contextvars(correlation_id=correlation_id) @@ -366,7 +383,7 @@ def set_correlation_id(correlation_id: Optional[str] = None) -> str: def get_correlation_id() -> Optional[str]: """ Get the current correlation ID from the context. - + Returns: Current correlation ID or None """ @@ -389,20 +406,21 @@ def get_crew_logger() -> AgentLogger: # Performance timing decorator def log_performance(operation_name: str = None): """Decorator to log operation performance""" + def decorator(func): def wrapper(*args, **kwargs): - + # Get logger from the class instance if available - if args and hasattr(args[0], 'logger'): + if args and hasattr(args[0], "logger"): logger = args[0].logger else: - logger = get_agent_logger(func.__module__.split('.')[-1]) - + logger = get_agent_logger(func.__module__.split(".")[-1]) + op_name = operation_name or func.__name__ start_time = time.time() - + logger.log_operation_start(op_name) - + try: result = func(*args, **kwargs) duration = time.time() - start_time @@ -410,10 +428,13 @@ def wrapper(*args, **kwargs): return result except Exception as e: duration = time.time() - start_time - logger.error(f"Operation {op_name} failed after {duration:.3f}s: {str(e)}") + logger.error( + f"Operation {op_name} failed after {duration:.3f}s: {str(e)}" + ) raise - + return wrapper + return decorator @@ -422,12 +443,15 @@ def wrapper(*args, **kwargs): # ============================================================================ # Context variable for tracking operation context across async boundaries -_operation_context: ContextVar[Dict[str, Any]] = ContextVar('operation_context', default={}) +_operation_context: ContextVar[Dict[str, Any]] = ContextVar( + "operation_context", default={} +) @dataclass class AgentDecision: """Represents an agent decision for logging and analysis""" + agent_name: str decision_type: str decision: str @@ -442,6 +466,7 @@ class AgentDecision: @dataclass class ToolUsage: """Represents tool usage for logging and analysis""" + agent_name: str tool_name: str input_params: Dict[str, Any] @@ -457,32 +482,32 @@ class AgentLogAnalyzer: Log analysis tools for agent operations. Provides insights into agent behavior, decisions, and performance. """ - + def __init__(self): self.decisions: List[AgentDecision] = [] self.tool_usages: List[ToolUsage] = [] self._lock = threading.Lock() - + def record_decision(self, decision: AgentDecision): """Record an agent decision for analysis""" with self._lock: self.decisions.append(decision) - + def record_tool_usage(self, usage: ToolUsage): """Record tool usage for analysis""" with self._lock: self.tool_usages.append(usage) - + def get_agent_statistics(self, agent_name: str = None) -> Dict[str, Any]: """Get statistics for a specific agent or all agents""" with self._lock: decisions = self.decisions tool_usages = self.tool_usages - + if agent_name: decisions = [d for d in decisions if d.agent_name == agent_name] tool_usages = [t for t in tool_usages if t.agent_name == agent_name] - + # Calculate decision statistics decision_types = defaultdict(int) avg_confidence = 0.0 @@ -491,79 +516,89 @@ def get_agent_statistics(self, agent_name: str = None) -> Dict[str, Any]: decision_types[d.decision_type] += 1 if d.confidence is not None: avg_confidence += d.confidence - avg_confidence /= len([d for d in decisions if d.confidence is not None]) if decisions else 1 - + avg_confidence /= ( + len([d for d in decisions if d.confidence is not None]) + if decisions + else 1 + ) + # Calculate tool usage statistics tool_counts = defaultdict(int) - tool_success_rate = defaultdict(lambda: {'success': 0, 'total': 0}) + tool_success_rate = defaultdict(lambda: {"success": 0, "total": 0}) avg_tool_duration = defaultdict(list) - + for t in tool_usages: tool_counts[t.tool_name] += 1 - tool_success_rate[t.tool_name]['total'] += 1 + tool_success_rate[t.tool_name]["total"] += 1 if t.success: - tool_success_rate[t.tool_name]['success'] += 1 + tool_success_rate[t.tool_name]["success"] += 1 avg_tool_duration[t.tool_name].append(t.duration_ms) - + # Calculate average durations avg_durations = {} for tool, durations in avg_tool_duration.items(): avg_durations[tool] = sum(durations) / len(durations) if durations else 0 - + # Calculate success rates success_rates = {} for tool, counts in tool_success_rate.items(): - success_rates[tool] = counts['success'] / counts['total'] if counts['total'] > 0 else 0 - + success_rates[tool] = ( + counts["success"] / counts["total"] if counts["total"] > 0 else 0 + ) + return { - 'agent_name': agent_name or 'all', - 'total_decisions': len(decisions), - 'total_tool_usages': len(tool_usages), - 'decision_types': dict(decision_types), - 'average_confidence': avg_confidence, - 'tool_usage_counts': dict(tool_counts), - 'tool_success_rates': success_rates, - 'average_tool_durations_ms': avg_durations, + "agent_name": agent_name or "all", + "total_decisions": len(decisions), + "total_tool_usages": len(tool_usages), + "decision_types": dict(decision_types), + "average_confidence": avg_confidence, + "tool_usage_counts": dict(tool_counts), + "tool_success_rates": success_rates, + "average_tool_durations_ms": avg_durations, } - - def get_decision_trace(self, agent_name: str = None, limit: int = 100) -> List[Dict]: + + def get_decision_trace( + self, agent_name: str = None, limit: int = 100 + ) -> List[Dict]: """Get a trace of recent decisions""" with self._lock: decisions = self.decisions.copy() - + if agent_name: decisions = [d for d in decisions if d.agent_name == agent_name] - + # Sort by timestamp (most recent first) and limit decisions = sorted(decisions, key=lambda d: d.timestamp, reverse=True)[:limit] - + return [asdict(d) for d in decisions] - - def get_tool_usage_trace(self, agent_name: str = None, limit: int = 100) -> List[Dict]: + + def get_tool_usage_trace( + self, agent_name: str = None, limit: int = 100 + ) -> List[Dict]: """Get a trace of recent tool usages""" with self._lock: usages = self.tool_usages.copy() - + if agent_name: usages = [u for u in usages if u.agent_name == agent_name] - + # Sort by timestamp (most recent first) and limit usages = sorted(usages, key=lambda u: u.timestamp, reverse=True)[:limit] - + return [asdict(u) for u in usages] - + def export_analysis(self, filepath: str): """Export analysis data to JSON file""" data = { - 'export_timestamp': datetime.now().isoformat(), - 'statistics': self.get_agent_statistics(), - 'decision_trace': self.get_decision_trace(limit=1000), - 'tool_usage_trace': self.get_tool_usage_trace(limit=1000), + "export_timestamp": datetime.now().isoformat(), + "statistics": self.get_agent_statistics(), + "decision_trace": self.get_decision_trace(limit=1000), + "tool_usage_trace": self.get_tool_usage_trace(limit=1000), } - - with open(filepath, 'w') as f: + + with open(filepath, "w") as f: json.dump(data, f, indent=2, default=str) - + def clear(self): """Clear all recorded data""" with self._lock: @@ -586,7 +621,7 @@ def get_log_analyzer() -> AgentLogAnalyzer: class EnhancedAgentLogger(AgentLogger): """ Enhanced agent logger with comprehensive logging capabilities. - + Features: - Structured logging with context - Decision and reasoning logging @@ -594,19 +629,21 @@ class EnhancedAgentLogger(AgentLogger): - Debug mode with verbose output - Integration with log analyzer """ - + def __init__(self, name: str, debug_mode: bool = False): super().__init__(name) - self._debug_mode = debug_mode or os.getenv("AGENT_DEBUG_MODE", "false").lower() == "true" + self._debug_mode = ( + debug_mode or os.getenv("AGENT_DEBUG_MODE", "false").lower() == "true" + ) self._analyzer = get_log_analyzer() self._operation_stack: List[Dict[str, Any]] = [] self._lock = threading.Lock() - + def set_debug_mode(self, enabled: bool): """Enable or disable debug mode""" self._debug_mode = enabled self.debug(f"Debug mode {'enabled' if enabled else 'disabled'}") - + def log_decision( self, decision_type: str, @@ -614,11 +651,11 @@ def log_decision( reasoning: str, confidence: float = None, alternatives: List[str] = None, - **context + **context, ): """ Log an agent decision with reasoning. - + Args: decision_type: Type of decision (e.g., 'feature_mapping', 'code_translation') decision: The decision made @@ -637,33 +674,31 @@ def log_decision( alternatives_considered=alternatives or [], context=context, ) - + # Record for analysis self._analyzer.record_decision(decision_record) - + # Log the decision log_context = { - 'decision_type': decision_type, - 'confidence': confidence, - 'alternatives_count': len(alternatives) if alternatives else 0, - **context + "decision_type": decision_type, + "confidence": confidence, + "alternatives_count": len(alternatives) if alternatives else 0, + **context, } - + if self._debug_mode: # Verbose output in debug mode self.info( - f"DECISION [{decision_type}]: {decision}", - agent_context=log_context + f"DECISION [{decision_type}]: {decision}", agent_context=log_context ) self.debug(f" Reasoning: {reasoning}") if alternatives: self.debug(f" Alternatives considered: {alternatives}") else: self.info( - f"Decision: {decision_type} -> {decision}", - agent_context=log_context + f"Decision: {decision_type} -> {decision}", agent_context=log_context ) - + def log_tool_call( self, tool_name: str, @@ -671,11 +706,11 @@ def log_tool_call( result: Any = None, success: bool = True, duration_ms: float = 0, - error: str = None + error: str = None, ): """ Log a tool call with input/output details. - + Args: tool_name: Name of the tool called input_params: Parameters passed to the tool @@ -689,40 +724,42 @@ def log_tool_call( agent_name=self.agent_name, tool_name=tool_name, input_params=input_params, - output_result=result if not isinstance(result, str) else result[:1000], # Truncate long strings + output_result=( + result if not isinstance(result, str) else result[:1000] + ), # Truncate long strings success=success, duration_ms=duration_ms, error_message=error, ) - + # Record for analysis self._analyzer.record_tool_usage(usage_record) - + # Log the tool call if success: self.info( f"Tool call: {tool_name} ({duration_ms:.1f}ms)", tool_name=tool_name, - operation_time=duration_ms / 1000 + operation_time=duration_ms / 1000, ) else: self.error( f"Tool call failed: {tool_name} - {error}", tool_name=tool_name, - operation_time=duration_ms / 1000 + operation_time=duration_ms / 1000, ) - + # Debug mode: log input/output details if self._debug_mode: self.debug(f" Tool input: {json.dumps(input_params, default=str)[:500]}") if success and result is not None: result_str = str(result)[:500] self.debug(f" Tool output: {result_str}") - + def log_reasoning_step(self, step: str, details: str = None): """ Log a reasoning step in the agent's thought process. - + Args: step: Name/description of the reasoning step details: Additional details about the step @@ -731,11 +768,13 @@ def log_reasoning_step(self, step: str, details: str = None): self.debug(f"Reasoning: {step}") if details: self.debug(f" Details: {details}") - - def log_state_change(self, state_name: str, old_value: Any, new_value: Any, reason: str = None): + + def log_state_change( + self, state_name: str, old_value: Any, new_value: Any, reason: str = None + ): """ Log a state change in the agent. - + Args: state_name: Name of the state variable old_value: Previous value @@ -743,89 +782,95 @@ def log_state_change(self, state_name: str, old_value: Any, new_value: Any, reas reason: Reason for the change """ context = { - 'state_name': state_name, - 'old_value': str(old_value)[:200], - 'new_value': str(new_value)[:200], + "state_name": state_name, + "old_value": str(old_value)[:200], + "new_value": str(new_value)[:200], } if reason: - context['reason'] = reason - + context["reason"] = reason + self.debug(f"State change: {state_name}", agent_context=context) - + def push_operation(self, operation: str, **context): """Push an operation onto the operation stack""" with self._lock: - self._operation_stack.append({ - 'operation': operation, - 'context': context, - 'start_time': time.time(), - }) + self._operation_stack.append( + { + "operation": operation, + "context": context, + "start_time": time.time(), + } + ) self.debug(f"Started operation: {operation}") - + def pop_operation(self, success: bool = True, error: str = None): """Pop an operation from the stack and log its completion""" with self._lock: if not self._operation_stack: self.warning("Attempted to pop operation from empty stack") return - + op = self._operation_stack.pop() - - duration = time.time() - op['start_time'] - operation = op['operation'] - + + duration = time.time() - op["start_time"] + operation = op["operation"] + if success: self.info(f"Completed operation: {operation} ({duration:.3f}s)") else: self.error(f"Failed operation: {operation} ({duration:.3f}s) - {error}") - + def get_current_operation(self) -> Optional[str]: """Get the current operation name""" with self._lock: if self._operation_stack: - return self._operation_stack[-1]['operation'] + return self._operation_stack[-1]["operation"] return None - + def log_error_with_trace(self, message: str, error: Exception = None): """ Log an error with full stack trace. - + Args: message: Error message error: Exception object (optional) """ self.error(message) - + if error and self._debug_mode: # Log full stack trace in debug mode trace = traceback.format_exc() self.debug(f"Stack trace:\n{trace}") - + def log_performance_metric(self, metric_name: str, value: float, unit: str = None): """ Log a performance metric. - + Args: metric_name: Name of the metric value: Metric value unit: Unit of measurement """ context = { - 'metric_name': metric_name, - 'metric_value': value, - 'metric_unit': unit, + "metric_name": metric_name, + "metric_value": value, + "metric_unit": unit, } - self.debug(f"Performance: {metric_name} = {value}{unit or ''}", agent_context=context) + self.debug( + f"Performance: {metric_name} = {value}{unit or ''}", agent_context=context + ) -def get_enhanced_agent_logger(agent_name: str, debug_mode: bool = False) -> EnhancedAgentLogger: +def get_enhanced_agent_logger( + agent_name: str, debug_mode: bool = False +) -> EnhancedAgentLogger: """ Get an enhanced agent logger with comprehensive logging capabilities. - + Args: agent_name: Name of the agent debug_mode: Enable debug mode for verbose output - + Returns: EnhancedAgentLogger instance """ @@ -846,31 +891,33 @@ def disable_global_debug_mode(): def export_log_analysis(filepath: str = None): """ Export log analysis to a JSON file. - + Args: filepath: Path to export file (optional, defaults to log directory) """ if filepath is None: log_dir = Path(os.getenv("LOG_DIR", "/tmp/modporter-ai/logs")) log_dir.mkdir(parents=True, exist_ok=True) - filepath = log_dir / f"agent_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" - + filepath = ( + log_dir / f"agent_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + ) + analyzer = get_log_analyzer() analyzer.export_analysis(str(filepath)) - + logger = logging.getLogger(__name__) logger.info(f"Log analysis exported to: {filepath}") - + return filepath def get_agent_log_summary(agent_name: str = None) -> Dict[str, Any]: """ Get a summary of agent logging activity. - + Args: agent_name: Specific agent name (optional, returns all if not specified) - + Returns: Dictionary with agent statistics """ diff --git a/backend/src/main.py b/backend/src/main.py index 70882cb7..2ed0c673 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -102,13 +102,13 @@ async def lifespan(app: FastAPI): if testing_env != "true": await init_db() logger.info("Database initialized") - + # Initialize tracing init_tracing(app=app, service_name="modporter-backend") logger.info("Distributed tracing initialized") - + yield - + # Shutdown shutdown_tracing() logger.info("Application shutdown") @@ -204,21 +204,31 @@ async def shutdown_event(): # Include API routers -app.include_router(performance.router, prefix="/api/v1/performance", tags=["performance"]) -app.include_router(behavioral_testing.router, prefix="/api/v1", tags=["behavioral-testing"]) +app.include_router( + performance.router, prefix="/api/v1/performance", tags=["performance"] +) +app.include_router( + behavioral_testing.router, prefix="/api/v1", tags=["behavioral-testing"] +) app.include_router(validation.router, prefix="/api/v1/validation", tags=["validation"]) app.include_router(comparison.router, prefix="/api/v1/comparison", tags=["comparison"]) app.include_router(embeddings.router, prefix="/api/v1/embeddings", tags=["embeddings"]) app.include_router(feedback.router, prefix="/api/v1", tags=["feedback"]) -app.include_router(experiments.router, prefix="/api/v1/experiments", tags=["experiments"]) +app.include_router( + experiments.router, prefix="/api/v1/experiments", tags=["experiments"] +) app.include_router(behavior_files.router, prefix="/api/v1", tags=["behavior-files"]) -app.include_router(behavior_templates.router, prefix="/api/v1", tags=["behavior-templates"]) +app.include_router( + behavior_templates.router, prefix="/api/v1", tags=["behavior-templates"] +) app.include_router(behavior_export.router, prefix="/api/v1", tags=["behavior-export"]) app.include_router(advanced_events.router, prefix="/api/v1", tags=["advanced-events"]) app.include_router(conversions.router) # Conversions API + WebSocket app.include_router(mod_imports.router, prefix="/api/v1/mods", tags=["mod-imports"]) app.include_router(analytics.router, prefix="/api/v1/analytics", tags=["analytics"]) -app.include_router(rate_limit_dashboard_router, prefix="/api/v1/rate-limit", tags=["rate-limiting"]) +app.include_router( + rate_limit_dashboard_router, prefix="/api/v1/rate-limit", tags=["rate-limiting"] +) # Health check endpoints (no prefix - used for Kubernetes probes) app.include_router(health.router) @@ -239,7 +249,9 @@ class ConversionRequest(BaseModel): target_version: str = Field( default="1.20.0", description="Target Minecraft version for the conversion." ) - options: Optional[dict] = Field(default=None, description="Optional conversion settings.") + options: Optional[dict] = Field( + default=None, description="Optional conversion settings." + ) @property def resolved_file_id(self) -> str: @@ -254,8 +266,12 @@ def resolved_original_name(self) -> str: class UploadResponse(BaseModel): """Response model for file upload""" - file_id: str = Field(..., description="Unique identifier assigned to the uploaded file.") - original_filename: str = Field(..., description="The original name of the uploaded file.") + file_id: str = Field( + ..., description="Unique identifier assigned to the uploaded file." + ) + original_filename: str = Field( + ..., description="The original name of the uploaded file." + ) saved_filename: str = Field( ..., description="The name under which the file is saved on the server (job_id + extension).", @@ -265,7 +281,9 @@ class UploadResponse(BaseModel): default=None, description="Detected content type of the uploaded file." ) message: str = Field(..., description="Status message confirming the upload.") - filename: str = Field(..., description="The uploaded filename (matches original_filename)") + filename: str = Field( + ..., description="The uploaded filename (matches original_filename)" + ) class ConversionResponse(BaseModel): @@ -420,12 +438,14 @@ async def simulate_ai_conversion(job_id: str): logger.error(f"Error: Job {job_id} not found for AI simulation.") return - original_mod_name = job.input_data.get("original_filename", "ConvertedAddon").split( - "." - )[0] + original_mod_name = job.input_data.get( + "original_filename", "ConvertedAddon" + ).split(".")[0] # Attempt to get user_id from job input_data, fall back to a default if not found # This field might not exist in older job records. - user_id_for_addon = job.input_data.get("user_id", conversion_parser.DEFAULT_USER_ID) + user_id_for_addon = job.input_data.get( + "user_id", conversion_parser.DEFAULT_USER_ID + ) def mirror_dict_from_job( current_job, progress_val=None, result_url=None, error_message=None @@ -438,7 +458,9 @@ def mirror_dict_from_job( progress=( progress_val if progress_val is not None - else (current_job.progress.progress if current_job.progress else 0) + else ( + current_job.progress.progress if current_job.progress else 0 + ) ), target_version=current_job.input_data.get("target_version"), options=current_job.input_data.get("options"), @@ -465,13 +487,17 @@ def mirror_dict_from_job( ) # Stage 2: Preprocessing -> Processing - job = await crud.update_job_status(session, PyUUID(job_id), "processing") + job = await crud.update_job_status( + session, PyUUID(job_id), "processing" + ) await crud.upsert_progress(session, PyUUID(job_id), 25) mirror = mirror_dict_from_job(job, 25) conversion_jobs_db[job_id] = mirror # Keep legacy mirror for now await cache.set_job_status(job_id, mirror.model_dump()) await cache.set_progress(job_id, 25) - logger.info(f"Job {job_id}: Status updated to {job.status}, Progress: 25%") + logger.info( + f"Job {job_id}: Status updated to {job.status}, Progress: 25%" + ) # Stage 3: BedrockArchitectAgent (25-50%) await ProgressHandler.broadcast_agent_start( @@ -497,13 +523,17 @@ def mirror_dict_from_job( if job.status == "cancelled": logger.info(f"Job {job_id} was cancelled. Stopping AI simulation.") return - job = await crud.update_job_status(session, PyUUID(job_id), "postprocessing") + job = await crud.update_job_status( + session, PyUUID(job_id), "postprocessing" + ) await crud.upsert_progress(session, PyUUID(job_id), 50) mirror = mirror_dict_from_job(job, 50) conversion_jobs_db[job_id] = mirror await cache.set_job_status(job_id, mirror.model_dump()) await cache.set_progress(job_id, 50) - logger.info(f"Job {job_id}: Status updated to {job.status}, Progress: 50%") + logger.info( + f"Job {job_id}: Status updated to {job.status}, Progress: 50%" + ) # Stage 5: LogicTranslatorAgent (50-75%) await ProgressHandler.broadcast_agent_start( @@ -622,7 +652,9 @@ def mirror_dict_from_job( f, ) # Dummy block behavior - with open(os.path.join(bp_dir, "blocks", "simulated_block.json"), "w") as f: + with open( + os.path.join(bp_dir, "blocks", "simulated_block.json"), "w" + ) as f: json.dump( { "minecraft:block": { @@ -635,7 +667,9 @@ def mirror_dict_from_job( f, ) # Dummy recipe - with open(os.path.join(bp_dir, "recipes", "simulated_recipe.json"), "w") as f: + with open( + os.path.join(bp_dir, "recipes", "simulated_recipe.json"), "w" + ) as f: json.dump( { "minecraft:recipe_shaped": { @@ -666,7 +700,9 @@ def mirror_dict_from_job( ) # Save Addon, Blocks, Recipes (assets list in addon_data_upload is empty) - await crud.update_addon_details(session, PyUUID(job_id), addon_data_upload) + await crud.update_addon_details( + session, PyUUID(job_id), addon_data_upload + ) logger.info( f"Job {job_id}: Addon core data (metadata, blocks, recipes) saved to DB." ) @@ -686,19 +722,27 @@ def mirror_dict_from_job( # Asset conversion integration - convert uploaded assets using AI engine try: - logger.info(f"Job {job_id}: Starting asset conversion for conversion job") + logger.info( + f"Job {job_id}: Starting asset conversion for conversion job" + ) asset_conversion_result = ( - await asset_conversion_service.convert_assets_for_conversion(job_id) + await asset_conversion_service.convert_assets_for_conversion( + job_id + ) ) if asset_conversion_result.get("success"): - converted_count = asset_conversion_result.get("converted_count", 0) + converted_count = asset_conversion_result.get( + "converted_count", 0 + ) failed_count = asset_conversion_result.get("failed_count", 0) logger.info( f"Job {job_id}: Asset conversion completed - {converted_count} converted, {failed_count} failed" ) else: - logger.warning(f"Job {job_id}: Asset conversion batch had issues") + logger.warning( + f"Job {job_id}: Asset conversion batch had issues" + ) except Exception as asset_error: logger.error(f"Job {job_id}: Asset conversion error: {asset_error}") @@ -706,7 +750,9 @@ def mirror_dict_from_job( # Original ZIP creation (can be retained or removed) os.makedirs(CONVERSION_OUTPUTS_DIR, exist_ok=True) - mock_output_filename_internal = f"{job.id}_converted.zip" # Original ZIP name + mock_output_filename_internal = ( + f"{job.id}_converted.zip" # Original ZIP name + ) mock_output_filepath = os.path.join( CONVERSION_OUTPUTS_DIR, mock_output_filename_internal ) @@ -718,7 +764,9 @@ def mirror_dict_from_job( "zip", simulated_pack_output_path, ) - logger.info(f"Job {job_id}: Original ZIP archive created at {mock_output_filepath}") + logger.info( + f"Job {job_id}: Original ZIP archive created at {mock_output_filepath}" + ) job = await crud.update_job_status(session, PyUUID(job_id), "completed") await crud.upsert_progress(session, PyUUID(job_id), 100) @@ -744,7 +792,9 @@ def mirror_dict_from_job( conversion_jobs_db[job_id] = mirror await cache.set_job_status(job_id, mirror.model_dump()) await cache.set_progress(job_id, 0) - logger.error(f"Job {job_id}: Status updated to FAILED due to error in processing.") + logger.error( + f"Job {job_id}: Status updated to FAILED due to error in processing." + ) # Broadcast failure to WebSocket clients await ProgressHandler.broadcast_conversion_failed(job_id, str(e_inner)) @@ -776,7 +826,9 @@ def mirror_dict_from_job( job_data_uuid_key = conversion_jobs_db[PyUUID(job_id)] job_data_uuid_key.status = "failed" # ... update other fields ... - await cache.set_job_status(str(PyUUID(job_id)), job_data_uuid_key.model_dump()) + await cache.set_job_status( + str(PyUUID(job_id)), job_data_uuid_key.model_dump() + ) except Exception as cache_error: logger.error( @@ -795,7 +847,9 @@ async def call_ai_engine_conversion(job_id: str): print(f"Error: Job {job_id} not found for AI Engine conversion.") return - def mirror_dict_from_job(job, progress_val=None, result_url=None, error_message=None): + def mirror_dict_from_job( + job, progress_val=None, result_url=None, error_message=None + ): # Compose dict for legacy mirror return ConversionJob( job_id=str(job.id), @@ -822,7 +876,9 @@ def mirror_dict_from_job(job, progress_val=None, result_url=None, error_message= output_path = os.path.join(CONVERSION_OUTPUTS_DIR, output_filename) # Get the input file path - input_file_path = os.path.join(TEMP_UPLOADS_DIR, f"{job.input_data.get('file_id')}.jar") + input_file_path = os.path.join( + TEMP_UPLOADS_DIR, f"{job.input_data.get('file_id')}.jar" + ) # Call AI Engine conversion_options = job.input_data.get("options", {}) @@ -834,11 +890,15 @@ def mirror_dict_from_job(job, progress_val=None, result_url=None, error_message= "conversion_options": conversion_options, } - print(f"Calling AI Engine at {AI_ENGINE_URL}/api/v1/convert with request: {ai_request}") + print( + f"Calling AI Engine at {AI_ENGINE_URL}/api/v1/convert with request: {ai_request}" + ) async with httpx.AsyncClient(timeout=600.0) as client: # 10 minute timeout # Start AI Engine conversion - response = await client.post(f"{AI_ENGINE_URL}/api/v1/convert", json=ai_request) + response = await client.post( + f"{AI_ENGINE_URL}/api/v1/convert", json=ai_request + ) if response.status_code != 200: raise Exception( @@ -854,14 +914,20 @@ def mirror_dict_from_job(job, progress_val=None, result_url=None, error_message= # Check if job was cancelled current_job = await crud.get_job(session, job_id) if current_job.status == "cancelled": - print(f"Job {job_id} was cancelled. Stopping AI Engine polling.") + print( + f"Job {job_id} was cancelled. Stopping AI Engine polling." + ) return # Get status from AI Engine - status_response = await client.get(f"{AI_ENGINE_URL}/api/v1/status/{job_id}") + status_response = await client.get( + f"{AI_ENGINE_URL}/api/v1/status/{job_id}" + ) if status_response.status_code != 200: - print(f"Failed to get AI Engine status: {status_response.status_code}") + print( + f"Failed to get AI Engine status: {status_response.status_code}" + ) continue ai_status = status_response.json() @@ -901,7 +967,9 @@ def mirror_dict_from_job(job, progress_val=None, result_url=None, error_message= ) # Verify the file exists if not os.path.exists(output_path): - print(f"Warning: Expected output file not found at {output_path}") + print( + f"Warning: Expected output file not found at {output_path}" + ) else: print(f"Job {job_id}: AI Engine conversion FAILED") @@ -1034,7 +1102,9 @@ async def get_conversion_status( descriptive_message = "Conversion completed successfully." elif status == "failed": descriptive_message = ( - f"Conversion failed: {error_message}" if error_message else "Conversion failed." + f"Conversion failed: {error_message}" + if error_message + else "Conversion failed." ) elif status == "cancelled": descriptive_message = "Job was cancelled by the user." @@ -1056,7 +1126,9 @@ async def get_conversion_status( # Fallback: load from DB job = await crud.get_job(db, job_id) if not job: - raise HTTPException(status_code=404, detail=f"Conversion job with ID '{job_id}' not found.") + raise HTTPException( + status_code=404, detail=f"Conversion job with ID '{job_id}' not found." + ) progress = job.progress.progress if job.progress else 0 error_message = None result_url = None @@ -1108,7 +1180,9 @@ async def get_conversion_status( ) -@app.get("/api/v1/conversions", response_model=List[ConversionStatus], tags=["conversion"]) +@app.get( + "/api/v1/conversions", response_model=List[ConversionStatus], tags=["conversion"] +) async def list_conversions(db: AsyncSession = Depends(get_db)): """ List all current and past conversion jobs. @@ -1169,7 +1243,9 @@ async def cancel_conversion( """ job = await crud.get_job(db, job_id) if not job: - raise HTTPException(status_code=404, detail=f"Conversion job with ID '{job_id}' not found.") + raise HTTPException( + status_code=404, detail=f"Conversion job with ID '{job_id}' not found." + ) if job.status == "cancelled": return {"message": f"Conversion job {job_id} is already cancelled."} job = await crud.update_job_status(db, job_id, "cancelled") @@ -1211,7 +1287,9 @@ async def download_converted_mod( job = conversion_jobs_db.get(job_id) if not job: - raise HTTPException(status_code=404, detail=f"Conversion job with ID '{job_id}' not found.") + raise HTTPException( + status_code=404, detail=f"Conversion job with ID '{job_id}' not found." + ) if job.status != "completed": raise HTTPException( @@ -1237,7 +1315,9 @@ async def download_converted_mod( if not os.path.exists(file_path): print(f"Error: Converted file not found at path: {file_path} for job {job_id}") # This case might indicate an issue post-completion or if the file was manually removed. - raise HTTPException(status_code=404, detail="Converted file not found on server.") + raise HTTPException( + status_code=404, detail="Converted file not found on server." + ) # Determine a user-friendly download filename original_filename_base = os.path.splitext(job.original_filename)[0] @@ -1363,8 +1443,12 @@ async def upsert_addon_details( and will create the addon if it doesn't exist, or update it if it does. For child collections (blocks, assets, recipes), this performs a full replacement. """ - db_addon = await crud.update_addon_details(session=db, addon_id=addon_id, addon_data=addon_data) - if db_addon is None: # Should not happen if crud.update_addon_details works as expected + db_addon = await crud.update_addon_details( + session=db, addon_id=addon_id, addon_data=addon_data + ) + if ( + db_addon is None + ): # Should not happen if crud.update_addon_details works as expected raise HTTPException(status_code=500, detail="Error processing addon data") return db_addon @@ -1391,17 +1475,23 @@ async def create_addon_asset_endpoint( session=db, addon_id=addon_id ) # Using get_addon_details to ensure addon exists if not addon: - raise HTTPException(status_code=404, detail=f"Addon with id {addon_id} not found.") + raise HTTPException( + status_code=404, detail=f"Addon with id {addon_id} not found." + ) try: db_asset = await crud.create_addon_asset( session=db, addon_id=addon_id, file=file, asset_type=asset_type ) - except ValueError as e: # Catch errors like Addon not found from CRUD (though checked above) + except ( + ValueError + ) as e: # Catch errors like Addon not found from CRUD (though checked above) raise HTTPException(status_code=404, detail=str(e)) except Exception as e: logger.error(f"Failed to create addon asset: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=f"Failed to create addon asset: {str(e)}") + raise HTTPException( + status_code=500, detail=f"Failed to create addon asset: {str(e)}" + ) return db_asset @@ -1427,7 +1517,9 @@ async def get_addon_asset_file( file_full_path = os.path.join(crud.BASE_ASSET_PATH, db_asset.path) if not os.path.exists(file_full_path): - logger.error(f"Asset file not found on disk: {file_full_path} for asset_id {asset_id}") + logger.error( + f"Asset file not found on disk: {file_full_path} for asset_id {asset_id}" + ) raise HTTPException(status_code=404, detail="Asset file not found on server.") return FileResponse( @@ -1459,17 +1551,25 @@ async def update_addon_asset_endpoint( ) try: - updated_asset = await crud.update_addon_asset(session=db, asset_id=asset_id, file=file) + updated_asset = await crud.update_addon_asset( + session=db, asset_id=asset_id, file=file + ) except Exception as e: logger.error(f"Failed to update addon asset {asset_id}: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=f"Failed to update addon asset: {str(e)}") + raise HTTPException( + status_code=500, detail=f"Failed to update addon asset: {str(e)}" + ) if not updated_asset: # Should be caught by prior check or raise exception in CRUD - raise HTTPException(status_code=404, detail="Asset not found after update attempt.") + raise HTTPException( + status_code=404, detail="Asset not found after update attempt." + ) return updated_asset -@app.delete("/api/v1/addons/{addon_id}/assets/{asset_id}", status_code=204, tags=["addons"]) +@app.delete( + "/api/v1/addons/{addon_id}/assets/{asset_id}", status_code=204, tags=["addons"] +) async def delete_addon_asset_endpoint( addon_id: PyUUID, # Validate addon ownership of asset asset_id: PyUUID, @@ -1487,7 +1587,9 @@ async def delete_addon_asset_endpoint( deleted_asset_info = await crud.delete_addon_asset(session=db, asset_id=asset_id) if not deleted_asset_info: # Should be caught by prior check - raise HTTPException(status_code=404, detail="Asset not found during delete operation.") + raise HTTPException( + status_code=404, detail="Asset not found during delete operation." + ) # Return 204 No Content by default for DELETE operations # FastAPI will automatically handle returning no body if status_code is 204 @@ -1514,7 +1616,9 @@ async def export_addon_mcaddon(addon_id: PyUUID, db: AsyncSession = Depends(get_ addon_pydantic=addon_details, asset_base_path=crud.BASE_ASSET_PATH ) except Exception as e: - logger.error(f"Error creating .mcaddon package for addon {addon_id}: {e}", exc_info=True) + logger.error( + f"Error creating .mcaddon package for addon {addon_id}: {e}", exc_info=True + ) raise HTTPException(status_code=500, detail=f"Failed to export addon: {str(e)}") # Sanitize addon name for filename diff --git a/backend/src/services/ai_engine_client.py b/backend/src/services/ai_engine_client.py index 5eb077aa..c0c2ea63 100644 --- a/backend/src/services/ai_engine_client.py +++ b/backend/src/services/ai_engine_client.py @@ -20,7 +20,9 @@ # AI Engine configuration AI_ENGINE_URL = os.getenv("AI_ENGINE_URL", "http://ai-engine:8001") -AI_ENGINE_TIMEOUT = httpx.Timeout(1800.0) # 30 minutes timeout for long-running conversions +AI_ENGINE_TIMEOUT = httpx.Timeout( + 1800.0 +) # 30 minutes timeout for long-running conversions # Default poll interval for checking conversion status DEFAULT_POLL_INTERVAL = 2.0 # seconds @@ -69,7 +71,7 @@ async def _get_client(self) -> httpx.AsyncClient: def _get_trace_headers(self) -> Dict[str, str]: """ Get trace context headers for propagating to downstream services. - + Returns: Dictionary of trace headers to include in requests """ @@ -301,7 +303,11 @@ async def poll_conversion_status( except AIEngineError as e: if e.status_code == 404: # Job not found - treat as terminal - yield {"status": "failed", "message": "Job not found", "progress": 0} + yield { + "status": "failed", + "message": "Job not found", + "progress": 0, + } break raise diff --git a/backend/src/services/structured_logging.py b/backend/src/services/structured_logging.py index 22080f19..85abb1a8 100644 --- a/backend/src/services/structured_logging.py +++ b/backend/src/services/structured_logging.py @@ -19,7 +19,9 @@ from structlog.stdlib import ProcessorFormatter # Context variable to store correlation ID across async operations -correlation_id_var: ContextVar[Optional[str]] = ContextVar("correlation_id", default=None) +correlation_id_var: ContextVar[Optional[str]] = ContextVar( + "correlation_id", default=None +) # Context variable to store request metadata request_metadata_var: ContextVar[Optional[Dict[str, Any]]] = ContextVar( @@ -98,7 +100,8 @@ def configure_structlog( else: console_handler.setFormatter( logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", ) ) root_logger.addHandler(console_handler) @@ -206,7 +209,8 @@ def get_standard_logger(name: str) -> logging.Logger: console_handler.setLevel(logging.INFO) console_formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", ) console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) @@ -306,7 +310,9 @@ def __enter__(self): # Bind to structlog context structlog.contextvars.clear_contextvars() - structlog.contextvars.bind_contextvars(correlation_id=self.correlation_id, **self.metadata) + structlog.contextvars.bind_contextvars( + correlation_id=self.correlation_id, **self.metadata + ) return self diff --git a/backend/src/services/tracing.py b/backend/src/services/tracing.py index 2f6b94ae..dca50ddc 100644 --- a/backend/src/services/tracing.py +++ b/backend/src/services/tracing.py @@ -36,88 +36,84 @@ def get_tracer(service_name: str = "modporter-backend") -> trace.Tracer: """ Get or create a tracer instance for the given service. - + Args: service_name: Name of the service for tracing - + Returns: Configured tracer instance """ global _tracer, _tracer_provider - + if _tracer is not None: return _tracer - + # Create resource with service information service_version = os.getenv("SERVICE_VERSION", "1.0.0") - resource = Resource.create({ - SERVICE_NAME: service_name, - SERVICE_VERSION: service_version, - }) - + resource = Resource.create( + { + SERVICE_NAME: service_name, + SERVICE_VERSION: service_version, + } + ) + # Add cloud metadata if available try: ec2_resource = AwsEc2ResourceDetector().detect() resource = resource.merge(ec2_resource) except Exception: pass - + try: ecs_resource = AwsEcsResourceDetector().detect() resource = resource.merge(ecs_resource) except Exception: pass - + # Create tracer provider _tracer_provider = TracerProvider(resource=resource) - + # Configure exporter based on environment tracing_enabled = os.getenv("TRACING_ENABLED", "true").lower() == "true" tracing_exporter = os.getenv("TRACING_EXPORTER", "jaeger").lower() - + if tracing_enabled: if tracing_exporter == "jaeger": # Jaeger exporter configuration jaeger_host = os.getenv("JAEGER_HOST", "localhost") jaeger_port = int(os.getenv("JAEGER_PORT", "6831")) - + jaeger_exporter = JaegerExporter( agent_host_name=jaeger_host, agent_port=jaeger_port, ) - _tracer_provider.add_span_processor( - BatchSpanProcessor(jaeger_exporter) - ) + _tracer_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter)) logger.info(f"Jaeger tracing enabled: {jaeger_host}:{jaeger_port}") - + elif tracing_exporter == "otlp": # OTLP exporter configuration otlp_endpoint = os.getenv("OTLP_ENDPOINT", "http://localhost:4317") - + otlp_exporter = OTLPSpanExporter( endpoint=otlp_endpoint, insecure=True, ) - _tracer_provider.add_span_processor( - BatchSpanProcessor(otlp_exporter) - ) + _tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter)) logger.info(f"OTLP tracing enabled: {otlp_endpoint}") - + # Add console exporter for development if os.getenv("TRACING_CONSOLE", "false").lower() == "true": - _tracer_provider.add_span_processor( - BatchSpanProcessor(ConsoleSpanExporter()) - ) + _tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) logger.info("Console span exporter enabled") - + # Set the global tracer provider trace.set_tracer_provider(_tracer_provider) - + # Create and return tracer _tracer = trace.get_tracer(service_name) - + logger.info(f"Tracing initialized for service: {service_name}") - + return _tracer @@ -130,19 +126,19 @@ def init_tracing( ) -> trace.Tracer: """ Initialize tracing with automatic instrumentation. - + Args: app: FastAPI application instance (optional) service_name: Name of the service instrument_fastapi: Whether to instrument FastAPI instrument_httpx: Whether to instrument HTTPX instrument_redis: Whether to instrument Redis - + Returns: Configured tracer instance """ tracer = get_tracer(service_name) - + # Instrument FastAPI if app provided if app and instrument_fastapi: try: @@ -150,7 +146,7 @@ def init_tracing( logger.info("FastAPI instrumentation enabled") except Exception as e: logger.warning(f"Failed to instrument FastAPI: {e}") - + # Instrument HTTPX if instrument_httpx: try: @@ -158,7 +154,7 @@ def init_tracing( logger.info("HTTPX instrumentation enabled") except Exception as e: logger.warning(f"Failed to instrument HTTPX: {e}") - + # Instrument Redis if instrument_redis: try: @@ -166,17 +162,17 @@ def init_tracing( logger.info("Redis instrumentation enabled") except Exception as e: logger.warning(f"Failed to instrument Redis: {e}") - + return tracer def extract_trace_context(carrier: dict) -> Context: """ Extract trace context from carrier (e.g., HTTP headers). - + Args: carrier: Dictionary containing trace context (e.g., HTTP headers) - + Returns: Extracted context """ @@ -186,10 +182,10 @@ def extract_trace_context(carrier: dict) -> Context: def inject_trace_context(carrier: dict) -> dict: """ Inject trace context into carrier (e.g., HTTP headers). - + Args: carrier: Dictionary to inject trace context into - + Returns: Carrier with injected trace context """ @@ -204,17 +200,17 @@ def create_span( ) -> trace.Span: """ Create a new span with the given name and context. - + Args: name: Name of the span context: Parent context (optional) kind: Span kind - + Returns: New span """ tracer = get_tracer() - + if context: with tracer.start_as_current_span(name, context=context, kind=kind) as span: return span @@ -226,7 +222,7 @@ def create_span( def add_span_attributes(span: trace.Span, attributes: dict) -> None: """ Add attributes to a span. - + Args: span: Span to add attributes to attributes: Dictionary of attributes @@ -239,7 +235,7 @@ def add_span_attributes(span: trace.Span, attributes: dict) -> None: def record_span_exception(span: trace.Span, exception: Exception) -> None: """ Record an exception on a span. - + Args: span: Span to record exception on exception: Exception to record @@ -251,7 +247,7 @@ def record_span_exception(span: trace.Span, exception: Exception) -> None: def shutdown_tracing() -> None: """Shutdown the tracing provider and flush any pending spans.""" global _tracer_provider - + if _tracer_provider: _tracer_provider.shutdown() logger.info("Tracing provider shutdown") @@ -260,28 +256,28 @@ def shutdown_tracing() -> None: class TracingMiddleware: """ Middleware for FastAPI to handle trace context propagation. - + This middleware extracts trace context from incoming requests and injects it into outgoing requests. """ - + def __init__(self, app): self.app = app - + async def __call__(self, scope, receive, send): if scope["type"] != "http": await self.app(scope, receive, send) return - + # Extract trace context from headers headers = dict(scope.get("headers", [])) # Convert bytes to string for headers headers = {k.decode(): v.decode() for k, v in headers.items()} - + # The FastAPI instrumentation will handle this automatically, # but we keep this for custom use cases context = extract_trace_context(headers) - + # Continue with the request await self.app(scope, receive, send) @@ -289,7 +285,7 @@ async def __call__(self, scope, receive, send): def get_current_span() -> Optional[trace.Span]: """ Get the current active span if any. - + Returns: Current span or None """ @@ -299,26 +295,26 @@ def get_current_span() -> Optional[trace.Span]: def get_trace_id() -> Optional[str]: """ Get the current trace ID as a hex string. - + Returns: Trace ID or None """ span = get_current_span() if span: trace_id = span.get_span_context().trace_id - return format(trace_id, '032x') + return format(trace_id, "032x") return None def get_span_id() -> Optional[str]: """ Get the current span ID as a hex string. - + Returns: Span ID or None """ span = get_current_span() if span: span_id = span.get_span_context().span_id - return format(span_id, '016x') + return format(span_id, "016x") return None From 5f3977fccf18f500f830dd92b6c9d2097579f8aa Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 8 Mar 2026 19:44:50 -0400 Subject: [PATCH 5/7] style: Fix ruff formatting issues - backend/src/main.py - backend/src/services/ai_engine_client.py - backend/src/services/structured_logging.py - ai-engine/main.py - ai-engine/utils/logging_config.py Co-authored-by: openhands --- ai-engine/main.py | 64 ++---- ai-engine/utils/logging_config.py | 81 +++----- backend/src/main.py | 214 ++++++--------------- backend/src/services/ai_engine_client.py | 4 +- backend/src/services/structured_logging.py | 8 +- 5 files changed, 100 insertions(+), 271 deletions(-) diff --git a/ai-engine/main.py b/ai-engine/main.py index 2f87ab54..0411b0ba 100644 --- a/ai-engine/main.py +++ b/ai-engine/main.py @@ -93,9 +93,9 @@ class ConversionStatusEnum(str, Enum): ) # CORS middleware - Restrict origins for security -allowed_origins = os.getenv( - "ALLOWED_ORIGINS", "http://localhost:3000,http://localhost:8080" -).split(",") +allowed_origins = os.getenv("ALLOWED_ORIGINS", "http://localhost:3000,http://localhost:8080").split( + "," +) app.add_middleware( CORSMiddleware, allow_origins=allowed_origins, @@ -120,20 +120,14 @@ async def set_job_status(self, job_id: str, status: "ConversionStatus") -> None: """Store job status in Redis with error handling""" try: if not self.available: - raise HTTPException( - status_code=503, detail="Job state storage unavailable" - ) + raise HTTPException(status_code=503, detail="Job state storage unavailable") status_dict = status.model_dump() status_dict["started_at"] = ( - status_dict["started_at"].isoformat() - if status_dict["started_at"] - else None + status_dict["started_at"].isoformat() if status_dict["started_at"] else None ) status_dict["completed_at"] = ( - status_dict["completed_at"].isoformat() - if status_dict["completed_at"] - else None + status_dict["completed_at"].isoformat() if status_dict["completed_at"] else None ) await self.redis.set( @@ -159,19 +153,13 @@ async def get_job_status(self, job_id: str) -> Optional["ConversionStatus"]: status_dict = json.loads(data) # Convert ISO strings back to datetime if status_dict.get("started_at"): - status_dict["started_at"] = datetime.fromisoformat( - status_dict["started_at"] - ) + status_dict["started_at"] = datetime.fromisoformat(status_dict["started_at"]) if status_dict.get("completed_at"): - status_dict["completed_at"] = datetime.fromisoformat( - status_dict["completed_at"] - ) + status_dict["completed_at"] = datetime.fromisoformat(status_dict["completed_at"]) return ConversionStatus(**status_dict) except Exception as e: - logger.error( - f"Failed to retrieve job status from Redis: {e}", exc_info=True - ) + logger.error(f"Failed to retrieve job status from Redis: {e}", exc_info=True) self.available = False return None @@ -307,9 +295,7 @@ async def health_check(): @app.post("/api/v1/convert", response_model=ConversionResponse, tags=["conversion"]) -async def start_conversion( - request: ConversionRequest, background_tasks: BackgroundTasks -): +async def start_conversion(request: ConversionRequest, background_tasks: BackgroundTasks): """Start a new mod conversion job""" if not job_manager or not job_manager.available: @@ -350,9 +336,7 @@ async def start_conversion( ) -@app.get( - "/api/v1/status/{job_id}", response_model=ConversionStatus, tags=["conversion"] -) +@app.get("/api/v1/status/{job_id}", response_model=ConversionStatus, tags=["conversion"]) async def get_conversion_status(job_id: str): """Get the status of a conversion job""" @@ -374,9 +358,7 @@ async def list_jobs(): # Note: In production, implement pagination and filtering # For now, return empty list as Redis doesn't have easy "list all" without keys - logger.warning( - "list_jobs endpoint returns empty - implement Redis SCAN for production" - ) + logger.warning("list_jobs endpoint returns empty - implement Redis SCAN for production") return [] @@ -404,9 +386,7 @@ async def process_conversion( job_status.progress = 10 await job_manager.set_job_status(job_id, job_status) - logger.info( - f"Processing conversion for job {job_id} with variant {experiment_variant}" - ) + logger.info(f"Processing conversion for job {job_id} with variant {experiment_variant}") # Create progress callback for real-time updates if available if PROGRESS_CALLBACK_AVAILABLE and create_progress_callback: @@ -434,9 +414,7 @@ async def process_conversion( crew = ModPorterConversionCrew( variant_id=experiment_variant, progress_callback=progress_callback ) - logger.info( - f"ModPorterConversionCrew initialized with variant: {experiment_variant}" - ) + logger.info(f"ModPorterConversionCrew initialized with variant: {experiment_variant}") # Update status for analysis stage job_status = await job_manager.get_job_status(job_id) @@ -462,7 +440,9 @@ async def process_conversion( job_status = await job_manager.get_job_status(job_id) if job_status: job_status.status = "failed" - job_status.message = f"Conversion failed: {conversion_result.get('error', 'Unknown error')}" + job_status.message = ( + f"Conversion failed: {conversion_result.get('error', 'Unknown error')}" + ) await job_manager.set_job_status(job_id, job_status) logger.error( f"Conversion failed for job {job_id}: {conversion_result.get('error')}" @@ -493,9 +473,7 @@ async def process_conversion( # Verify output file was created if not os.path.exists(output_path): - logger.error( - f"Output file not created by conversion crew: {output_path}" - ) + logger.error(f"Output file not created by conversion crew: {output_path}") logger.error( "This indicates a serious conversion failure that should not be masked" ) @@ -547,11 +525,7 @@ async def process_conversion( ) finally: # Clean up progress callback - if ( - progress_callback - and PROGRESS_CALLBACK_AVAILABLE - and cleanup_progress_callback - ): + if progress_callback and PROGRESS_CALLBACK_AVAILABLE and cleanup_progress_callback: try: await cleanup_progress_callback(job_id) logger.info(f"Cleaned up progress callback for job {job_id}") diff --git a/ai-engine/utils/logging_config.py b/ai-engine/utils/logging_config.py index dec5f409..f1304700 100644 --- a/ai-engine/utils/logging_config.py +++ b/ai-engine/utils/logging_config.py @@ -27,9 +27,7 @@ import json # Context variable for correlation ID -correlation_id_var: ContextVar[Optional[str]] = ContextVar( - "correlation_id", default=None -) +correlation_id_var: ContextVar[Optional[str]] = ContextVar("correlation_id", default=None) def configure_structlog( @@ -111,7 +109,10 @@ def configure_structlog( log_file = os.path.join(log_dir, "ai-engine.log") file_handler = logging.handlers.RotatingFileHandler( - log_file, maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8" # 10MB + log_file, + maxBytes=10 * 1024 * 1024, + backupCount=5, + encoding="utf-8", # 10MB ) file_handler.setLevel(logging.INFO) file_handler.setFormatter(logging.Formatter("%(message)s")) @@ -129,9 +130,7 @@ def __init__(self, include_agent_context: bool = True): def format(self, record: logging.LogRecord) -> str: # Base format with timestamp, level, and logger name - timestamp = datetime.fromtimestamp(record.created).strftime( - "%Y-%m-%d %H:%M:%S.%f" - )[:-3] + timestamp = datetime.fromtimestamp(record.created).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # Extract agent name from logger name (e.g., 'agents.java_analyzer' -> 'JavaAnalyzer') logger_parts = record.name.split(".") @@ -147,9 +146,7 @@ def format(self, record: logging.LogRecord) -> str: agent_name = record.name.split(".")[-1].title() # Build the log message - base_msg = ( - f"{timestamp} [{record.levelname}] {agent_name}: {record.getMessage()}" - ) + base_msg = f"{timestamp} [{record.levelname}] {agent_name}: {record.getMessage()}" # Add extra context if available if hasattr(record, "agent_context") and self.include_agent_context: @@ -207,13 +204,9 @@ def log_operation_start(self, operation: str, **context): def log_operation_complete(self, operation: str, duration: float, **context): """Log the completion of an operation with timing""" - self.info( - f"Completed {operation}", operation_time=duration, agent_context=context - ) + self.info(f"Completed {operation}", operation_time=duration, agent_context=context) - def log_tool_usage( - self, tool_name: str, result: Any = None, duration: Optional[float] = None - ): + def log_tool_usage(self, tool_name: str, result: Any = None, duration: Optional[float] = None): """Log tool usage with results""" kwargs = {"tool_name": tool_name} if result is not None: @@ -321,9 +314,7 @@ def setup_logging( logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) - logging.getLogger("crewai").setLevel( - logging.INFO if debug_mode else logging.WARNING - ) + logging.getLogger("crewai").setLevel(logging.INFO if debug_mode else logging.WARNING) # Log the configuration logger = logging.getLogger(__name__) @@ -428,9 +419,7 @@ def wrapper(*args, **kwargs): return result except Exception as e: duration = time.time() - start_time - logger.error( - f"Operation {op_name} failed after {duration:.3f}s: {str(e)}" - ) + logger.error(f"Operation {op_name} failed after {duration:.3f}s: {str(e)}") raise return wrapper @@ -443,9 +432,7 @@ def wrapper(*args, **kwargs): # ============================================================================ # Context variable for tracking operation context across async boundaries -_operation_context: ContextVar[Dict[str, Any]] = ContextVar( - "operation_context", default={} -) +_operation_context: ContextVar[Dict[str, Any]] = ContextVar("operation_context", default={}) @dataclass @@ -517,9 +504,7 @@ def get_agent_statistics(self, agent_name: str = None) -> Dict[str, Any]: if d.confidence is not None: avg_confidence += d.confidence avg_confidence /= ( - len([d for d in decisions if d.confidence is not None]) - if decisions - else 1 + len([d for d in decisions if d.confidence is not None]) if decisions else 1 ) # Calculate tool usage statistics @@ -542,9 +527,7 @@ def get_agent_statistics(self, agent_name: str = None) -> Dict[str, Any]: # Calculate success rates success_rates = {} for tool, counts in tool_success_rate.items(): - success_rates[tool] = ( - counts["success"] / counts["total"] if counts["total"] > 0 else 0 - ) + success_rates[tool] = counts["success"] / counts["total"] if counts["total"] > 0 else 0 return { "agent_name": agent_name or "all", @@ -557,9 +540,7 @@ def get_agent_statistics(self, agent_name: str = None) -> Dict[str, Any]: "average_tool_durations_ms": avg_durations, } - def get_decision_trace( - self, agent_name: str = None, limit: int = 100 - ) -> List[Dict]: + def get_decision_trace(self, agent_name: str = None, limit: int = 100) -> List[Dict]: """Get a trace of recent decisions""" with self._lock: decisions = self.decisions.copy() @@ -572,9 +553,7 @@ def get_decision_trace( return [asdict(d) for d in decisions] - def get_tool_usage_trace( - self, agent_name: str = None, limit: int = 100 - ) -> List[Dict]: + def get_tool_usage_trace(self, agent_name: str = None, limit: int = 100) -> List[Dict]: """Get a trace of recent tool usages""" with self._lock: usages = self.tool_usages.copy() @@ -632,9 +611,7 @@ class EnhancedAgentLogger(AgentLogger): def __init__(self, name: str, debug_mode: bool = False): super().__init__(name) - self._debug_mode = ( - debug_mode or os.getenv("AGENT_DEBUG_MODE", "false").lower() == "true" - ) + self._debug_mode = debug_mode or os.getenv("AGENT_DEBUG_MODE", "false").lower() == "true" self._analyzer = get_log_analyzer() self._operation_stack: List[Dict[str, Any]] = [] self._lock = threading.Lock() @@ -688,16 +665,12 @@ def log_decision( if self._debug_mode: # Verbose output in debug mode - self.info( - f"DECISION [{decision_type}]: {decision}", agent_context=log_context - ) + self.info(f"DECISION [{decision_type}]: {decision}", agent_context=log_context) self.debug(f" Reasoning: {reasoning}") if alternatives: self.debug(f" Alternatives considered: {alternatives}") else: - self.info( - f"Decision: {decision_type} -> {decision}", agent_context=log_context - ) + self.info(f"Decision: {decision_type} -> {decision}", agent_context=log_context) def log_tool_call( self, @@ -769,9 +742,7 @@ def log_reasoning_step(self, step: str, details: str = None): if details: self.debug(f" Details: {details}") - def log_state_change( - self, state_name: str, old_value: Any, new_value: Any, reason: str = None - ): + def log_state_change(self, state_name: str, old_value: Any, new_value: Any, reason: str = None): """ Log a state change in the agent. @@ -856,14 +827,10 @@ def log_performance_metric(self, metric_name: str, value: float, unit: str = Non "metric_value": value, "metric_unit": unit, } - self.debug( - f"Performance: {metric_name} = {value}{unit or ''}", agent_context=context - ) + self.debug(f"Performance: {metric_name} = {value}{unit or ''}", agent_context=context) -def get_enhanced_agent_logger( - agent_name: str, debug_mode: bool = False -) -> EnhancedAgentLogger: +def get_enhanced_agent_logger(agent_name: str, debug_mode: bool = False) -> EnhancedAgentLogger: """ Get an enhanced agent logger with comprehensive logging capabilities. @@ -898,9 +865,7 @@ def export_log_analysis(filepath: str = None): if filepath is None: log_dir = Path(os.getenv("LOG_DIR", "/tmp/modporter-ai/logs")) log_dir.mkdir(parents=True, exist_ok=True) - filepath = ( - log_dir / f"agent_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" - ) + filepath = log_dir / f"agent_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" analyzer = get_log_analyzer() analyzer.export_analysis(str(filepath)) diff --git a/backend/src/main.py b/backend/src/main.py index 2ed0c673..4c48bb47 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -204,31 +204,21 @@ async def shutdown_event(): # Include API routers -app.include_router( - performance.router, prefix="/api/v1/performance", tags=["performance"] -) -app.include_router( - behavioral_testing.router, prefix="/api/v1", tags=["behavioral-testing"] -) +app.include_router(performance.router, prefix="/api/v1/performance", tags=["performance"]) +app.include_router(behavioral_testing.router, prefix="/api/v1", tags=["behavioral-testing"]) app.include_router(validation.router, prefix="/api/v1/validation", tags=["validation"]) app.include_router(comparison.router, prefix="/api/v1/comparison", tags=["comparison"]) app.include_router(embeddings.router, prefix="/api/v1/embeddings", tags=["embeddings"]) app.include_router(feedback.router, prefix="/api/v1", tags=["feedback"]) -app.include_router( - experiments.router, prefix="/api/v1/experiments", tags=["experiments"] -) +app.include_router(experiments.router, prefix="/api/v1/experiments", tags=["experiments"]) app.include_router(behavior_files.router, prefix="/api/v1", tags=["behavior-files"]) -app.include_router( - behavior_templates.router, prefix="/api/v1", tags=["behavior-templates"] -) +app.include_router(behavior_templates.router, prefix="/api/v1", tags=["behavior-templates"]) app.include_router(behavior_export.router, prefix="/api/v1", tags=["behavior-export"]) app.include_router(advanced_events.router, prefix="/api/v1", tags=["advanced-events"]) app.include_router(conversions.router) # Conversions API + WebSocket app.include_router(mod_imports.router, prefix="/api/v1/mods", tags=["mod-imports"]) app.include_router(analytics.router, prefix="/api/v1/analytics", tags=["analytics"]) -app.include_router( - rate_limit_dashboard_router, prefix="/api/v1/rate-limit", tags=["rate-limiting"] -) +app.include_router(rate_limit_dashboard_router, prefix="/api/v1/rate-limit", tags=["rate-limiting"]) # Health check endpoints (no prefix - used for Kubernetes probes) app.include_router(health.router) @@ -249,9 +239,7 @@ class ConversionRequest(BaseModel): target_version: str = Field( default="1.20.0", description="Target Minecraft version for the conversion." ) - options: Optional[dict] = Field( - default=None, description="Optional conversion settings." - ) + options: Optional[dict] = Field(default=None, description="Optional conversion settings.") @property def resolved_file_id(self) -> str: @@ -266,12 +254,8 @@ def resolved_original_name(self) -> str: class UploadResponse(BaseModel): """Response model for file upload""" - file_id: str = Field( - ..., description="Unique identifier assigned to the uploaded file." - ) - original_filename: str = Field( - ..., description="The original name of the uploaded file." - ) + file_id: str = Field(..., description="Unique identifier assigned to the uploaded file.") + original_filename: str = Field(..., description="The original name of the uploaded file.") saved_filename: str = Field( ..., description="The name under which the file is saved on the server (job_id + extension).", @@ -281,9 +265,7 @@ class UploadResponse(BaseModel): default=None, description="Detected content type of the uploaded file." ) message: str = Field(..., description="Status message confirming the upload.") - filename: str = Field( - ..., description="The uploaded filename (matches original_filename)" - ) + filename: str = Field(..., description="The uploaded filename (matches original_filename)") class ConversionResponse(BaseModel): @@ -438,14 +420,12 @@ async def simulate_ai_conversion(job_id: str): logger.error(f"Error: Job {job_id} not found for AI simulation.") return - original_mod_name = job.input_data.get( - "original_filename", "ConvertedAddon" - ).split(".")[0] + original_mod_name = job.input_data.get("original_filename", "ConvertedAddon").split( + "." + )[0] # Attempt to get user_id from job input_data, fall back to a default if not found # This field might not exist in older job records. - user_id_for_addon = job.input_data.get( - "user_id", conversion_parser.DEFAULT_USER_ID - ) + user_id_for_addon = job.input_data.get("user_id", conversion_parser.DEFAULT_USER_ID) def mirror_dict_from_job( current_job, progress_val=None, result_url=None, error_message=None @@ -458,9 +438,7 @@ def mirror_dict_from_job( progress=( progress_val if progress_val is not None - else ( - current_job.progress.progress if current_job.progress else 0 - ) + else (current_job.progress.progress if current_job.progress else 0) ), target_version=current_job.input_data.get("target_version"), options=current_job.input_data.get("options"), @@ -487,17 +465,13 @@ def mirror_dict_from_job( ) # Stage 2: Preprocessing -> Processing - job = await crud.update_job_status( - session, PyUUID(job_id), "processing" - ) + job = await crud.update_job_status(session, PyUUID(job_id), "processing") await crud.upsert_progress(session, PyUUID(job_id), 25) mirror = mirror_dict_from_job(job, 25) conversion_jobs_db[job_id] = mirror # Keep legacy mirror for now await cache.set_job_status(job_id, mirror.model_dump()) await cache.set_progress(job_id, 25) - logger.info( - f"Job {job_id}: Status updated to {job.status}, Progress: 25%" - ) + logger.info(f"Job {job_id}: Status updated to {job.status}, Progress: 25%") # Stage 3: BedrockArchitectAgent (25-50%) await ProgressHandler.broadcast_agent_start( @@ -523,17 +497,13 @@ def mirror_dict_from_job( if job.status == "cancelled": logger.info(f"Job {job_id} was cancelled. Stopping AI simulation.") return - job = await crud.update_job_status( - session, PyUUID(job_id), "postprocessing" - ) + job = await crud.update_job_status(session, PyUUID(job_id), "postprocessing") await crud.upsert_progress(session, PyUUID(job_id), 50) mirror = mirror_dict_from_job(job, 50) conversion_jobs_db[job_id] = mirror await cache.set_job_status(job_id, mirror.model_dump()) await cache.set_progress(job_id, 50) - logger.info( - f"Job {job_id}: Status updated to {job.status}, Progress: 50%" - ) + logger.info(f"Job {job_id}: Status updated to {job.status}, Progress: 50%") # Stage 5: LogicTranslatorAgent (50-75%) await ProgressHandler.broadcast_agent_start( @@ -652,9 +622,7 @@ def mirror_dict_from_job( f, ) # Dummy block behavior - with open( - os.path.join(bp_dir, "blocks", "simulated_block.json"), "w" - ) as f: + with open(os.path.join(bp_dir, "blocks", "simulated_block.json"), "w") as f: json.dump( { "minecraft:block": { @@ -667,9 +635,7 @@ def mirror_dict_from_job( f, ) # Dummy recipe - with open( - os.path.join(bp_dir, "recipes", "simulated_recipe.json"), "w" - ) as f: + with open(os.path.join(bp_dir, "recipes", "simulated_recipe.json"), "w") as f: json.dump( { "minecraft:recipe_shaped": { @@ -700,9 +666,7 @@ def mirror_dict_from_job( ) # Save Addon, Blocks, Recipes (assets list in addon_data_upload is empty) - await crud.update_addon_details( - session, PyUUID(job_id), addon_data_upload - ) + await crud.update_addon_details(session, PyUUID(job_id), addon_data_upload) logger.info( f"Job {job_id}: Addon core data (metadata, blocks, recipes) saved to DB." ) @@ -722,27 +686,19 @@ def mirror_dict_from_job( # Asset conversion integration - convert uploaded assets using AI engine try: - logger.info( - f"Job {job_id}: Starting asset conversion for conversion job" - ) + logger.info(f"Job {job_id}: Starting asset conversion for conversion job") asset_conversion_result = ( - await asset_conversion_service.convert_assets_for_conversion( - job_id - ) + await asset_conversion_service.convert_assets_for_conversion(job_id) ) if asset_conversion_result.get("success"): - converted_count = asset_conversion_result.get( - "converted_count", 0 - ) + converted_count = asset_conversion_result.get("converted_count", 0) failed_count = asset_conversion_result.get("failed_count", 0) logger.info( f"Job {job_id}: Asset conversion completed - {converted_count} converted, {failed_count} failed" ) else: - logger.warning( - f"Job {job_id}: Asset conversion batch had issues" - ) + logger.warning(f"Job {job_id}: Asset conversion batch had issues") except Exception as asset_error: logger.error(f"Job {job_id}: Asset conversion error: {asset_error}") @@ -750,9 +706,7 @@ def mirror_dict_from_job( # Original ZIP creation (can be retained or removed) os.makedirs(CONVERSION_OUTPUTS_DIR, exist_ok=True) - mock_output_filename_internal = ( - f"{job.id}_converted.zip" # Original ZIP name - ) + mock_output_filename_internal = f"{job.id}_converted.zip" # Original ZIP name mock_output_filepath = os.path.join( CONVERSION_OUTPUTS_DIR, mock_output_filename_internal ) @@ -764,9 +718,7 @@ def mirror_dict_from_job( "zip", simulated_pack_output_path, ) - logger.info( - f"Job {job_id}: Original ZIP archive created at {mock_output_filepath}" - ) + logger.info(f"Job {job_id}: Original ZIP archive created at {mock_output_filepath}") job = await crud.update_job_status(session, PyUUID(job_id), "completed") await crud.upsert_progress(session, PyUUID(job_id), 100) @@ -792,9 +744,7 @@ def mirror_dict_from_job( conversion_jobs_db[job_id] = mirror await cache.set_job_status(job_id, mirror.model_dump()) await cache.set_progress(job_id, 0) - logger.error( - f"Job {job_id}: Status updated to FAILED due to error in processing." - ) + logger.error(f"Job {job_id}: Status updated to FAILED due to error in processing.") # Broadcast failure to WebSocket clients await ProgressHandler.broadcast_conversion_failed(job_id, str(e_inner)) @@ -826,9 +776,7 @@ def mirror_dict_from_job( job_data_uuid_key = conversion_jobs_db[PyUUID(job_id)] job_data_uuid_key.status = "failed" # ... update other fields ... - await cache.set_job_status( - str(PyUUID(job_id)), job_data_uuid_key.model_dump() - ) + await cache.set_job_status(str(PyUUID(job_id)), job_data_uuid_key.model_dump()) except Exception as cache_error: logger.error( @@ -847,9 +795,7 @@ async def call_ai_engine_conversion(job_id: str): print(f"Error: Job {job_id} not found for AI Engine conversion.") return - def mirror_dict_from_job( - job, progress_val=None, result_url=None, error_message=None - ): + def mirror_dict_from_job(job, progress_val=None, result_url=None, error_message=None): # Compose dict for legacy mirror return ConversionJob( job_id=str(job.id), @@ -876,9 +822,7 @@ def mirror_dict_from_job( output_path = os.path.join(CONVERSION_OUTPUTS_DIR, output_filename) # Get the input file path - input_file_path = os.path.join( - TEMP_UPLOADS_DIR, f"{job.input_data.get('file_id')}.jar" - ) + input_file_path = os.path.join(TEMP_UPLOADS_DIR, f"{job.input_data.get('file_id')}.jar") # Call AI Engine conversion_options = job.input_data.get("options", {}) @@ -890,15 +834,11 @@ def mirror_dict_from_job( "conversion_options": conversion_options, } - print( - f"Calling AI Engine at {AI_ENGINE_URL}/api/v1/convert with request: {ai_request}" - ) + print(f"Calling AI Engine at {AI_ENGINE_URL}/api/v1/convert with request: {ai_request}") async with httpx.AsyncClient(timeout=600.0) as client: # 10 minute timeout # Start AI Engine conversion - response = await client.post( - f"{AI_ENGINE_URL}/api/v1/convert", json=ai_request - ) + response = await client.post(f"{AI_ENGINE_URL}/api/v1/convert", json=ai_request) if response.status_code != 200: raise Exception( @@ -914,20 +854,14 @@ def mirror_dict_from_job( # Check if job was cancelled current_job = await crud.get_job(session, job_id) if current_job.status == "cancelled": - print( - f"Job {job_id} was cancelled. Stopping AI Engine polling." - ) + print(f"Job {job_id} was cancelled. Stopping AI Engine polling.") return # Get status from AI Engine - status_response = await client.get( - f"{AI_ENGINE_URL}/api/v1/status/{job_id}" - ) + status_response = await client.get(f"{AI_ENGINE_URL}/api/v1/status/{job_id}") if status_response.status_code != 200: - print( - f"Failed to get AI Engine status: {status_response.status_code}" - ) + print(f"Failed to get AI Engine status: {status_response.status_code}") continue ai_status = status_response.json() @@ -967,9 +901,7 @@ def mirror_dict_from_job( ) # Verify the file exists if not os.path.exists(output_path): - print( - f"Warning: Expected output file not found at {output_path}" - ) + print(f"Warning: Expected output file not found at {output_path}") else: print(f"Job {job_id}: AI Engine conversion FAILED") @@ -1102,9 +1034,7 @@ async def get_conversion_status( descriptive_message = "Conversion completed successfully." elif status == "failed": descriptive_message = ( - f"Conversion failed: {error_message}" - if error_message - else "Conversion failed." + f"Conversion failed: {error_message}" if error_message else "Conversion failed." ) elif status == "cancelled": descriptive_message = "Job was cancelled by the user." @@ -1126,9 +1056,7 @@ async def get_conversion_status( # Fallback: load from DB job = await crud.get_job(db, job_id) if not job: - raise HTTPException( - status_code=404, detail=f"Conversion job with ID '{job_id}' not found." - ) + raise HTTPException(status_code=404, detail=f"Conversion job with ID '{job_id}' not found.") progress = job.progress.progress if job.progress else 0 error_message = None result_url = None @@ -1180,9 +1108,7 @@ async def get_conversion_status( ) -@app.get( - "/api/v1/conversions", response_model=List[ConversionStatus], tags=["conversion"] -) +@app.get("/api/v1/conversions", response_model=List[ConversionStatus], tags=["conversion"]) async def list_conversions(db: AsyncSession = Depends(get_db)): """ List all current and past conversion jobs. @@ -1243,9 +1169,7 @@ async def cancel_conversion( """ job = await crud.get_job(db, job_id) if not job: - raise HTTPException( - status_code=404, detail=f"Conversion job with ID '{job_id}' not found." - ) + raise HTTPException(status_code=404, detail=f"Conversion job with ID '{job_id}' not found.") if job.status == "cancelled": return {"message": f"Conversion job {job_id} is already cancelled."} job = await crud.update_job_status(db, job_id, "cancelled") @@ -1287,9 +1211,7 @@ async def download_converted_mod( job = conversion_jobs_db.get(job_id) if not job: - raise HTTPException( - status_code=404, detail=f"Conversion job with ID '{job_id}' not found." - ) + raise HTTPException(status_code=404, detail=f"Conversion job with ID '{job_id}' not found.") if job.status != "completed": raise HTTPException( @@ -1315,9 +1237,7 @@ async def download_converted_mod( if not os.path.exists(file_path): print(f"Error: Converted file not found at path: {file_path} for job {job_id}") # This case might indicate an issue post-completion or if the file was manually removed. - raise HTTPException( - status_code=404, detail="Converted file not found on server." - ) + raise HTTPException(status_code=404, detail="Converted file not found on server.") # Determine a user-friendly download filename original_filename_base = os.path.splitext(job.original_filename)[0] @@ -1443,12 +1363,8 @@ async def upsert_addon_details( and will create the addon if it doesn't exist, or update it if it does. For child collections (blocks, assets, recipes), this performs a full replacement. """ - db_addon = await crud.update_addon_details( - session=db, addon_id=addon_id, addon_data=addon_data - ) - if ( - db_addon is None - ): # Should not happen if crud.update_addon_details works as expected + db_addon = await crud.update_addon_details(session=db, addon_id=addon_id, addon_data=addon_data) + if db_addon is None: # Should not happen if crud.update_addon_details works as expected raise HTTPException(status_code=500, detail="Error processing addon data") return db_addon @@ -1475,23 +1391,17 @@ async def create_addon_asset_endpoint( session=db, addon_id=addon_id ) # Using get_addon_details to ensure addon exists if not addon: - raise HTTPException( - status_code=404, detail=f"Addon with id {addon_id} not found." - ) + raise HTTPException(status_code=404, detail=f"Addon with id {addon_id} not found.") try: db_asset = await crud.create_addon_asset( session=db, addon_id=addon_id, file=file, asset_type=asset_type ) - except ( - ValueError - ) as e: # Catch errors like Addon not found from CRUD (though checked above) + except ValueError as e: # Catch errors like Addon not found from CRUD (though checked above) raise HTTPException(status_code=404, detail=str(e)) except Exception as e: logger.error(f"Failed to create addon asset: {e}", exc_info=True) - raise HTTPException( - status_code=500, detail=f"Failed to create addon asset: {str(e)}" - ) + raise HTTPException(status_code=500, detail=f"Failed to create addon asset: {str(e)}") return db_asset @@ -1517,9 +1427,7 @@ async def get_addon_asset_file( file_full_path = os.path.join(crud.BASE_ASSET_PATH, db_asset.path) if not os.path.exists(file_full_path): - logger.error( - f"Asset file not found on disk: {file_full_path} for asset_id {asset_id}" - ) + logger.error(f"Asset file not found on disk: {file_full_path} for asset_id {asset_id}") raise HTTPException(status_code=404, detail="Asset file not found on server.") return FileResponse( @@ -1551,25 +1459,17 @@ async def update_addon_asset_endpoint( ) try: - updated_asset = await crud.update_addon_asset( - session=db, asset_id=asset_id, file=file - ) + updated_asset = await crud.update_addon_asset(session=db, asset_id=asset_id, file=file) except Exception as e: logger.error(f"Failed to update addon asset {asset_id}: {e}", exc_info=True) - raise HTTPException( - status_code=500, detail=f"Failed to update addon asset: {str(e)}" - ) + raise HTTPException(status_code=500, detail=f"Failed to update addon asset: {str(e)}") if not updated_asset: # Should be caught by prior check or raise exception in CRUD - raise HTTPException( - status_code=404, detail="Asset not found after update attempt." - ) + raise HTTPException(status_code=404, detail="Asset not found after update attempt.") return updated_asset -@app.delete( - "/api/v1/addons/{addon_id}/assets/{asset_id}", status_code=204, tags=["addons"] -) +@app.delete("/api/v1/addons/{addon_id}/assets/{asset_id}", status_code=204, tags=["addons"]) async def delete_addon_asset_endpoint( addon_id: PyUUID, # Validate addon ownership of asset asset_id: PyUUID, @@ -1587,9 +1487,7 @@ async def delete_addon_asset_endpoint( deleted_asset_info = await crud.delete_addon_asset(session=db, asset_id=asset_id) if not deleted_asset_info: # Should be caught by prior check - raise HTTPException( - status_code=404, detail="Asset not found during delete operation." - ) + raise HTTPException(status_code=404, detail="Asset not found during delete operation.") # Return 204 No Content by default for DELETE operations # FastAPI will automatically handle returning no body if status_code is 204 @@ -1616,9 +1514,7 @@ async def export_addon_mcaddon(addon_id: PyUUID, db: AsyncSession = Depends(get_ addon_pydantic=addon_details, asset_base_path=crud.BASE_ASSET_PATH ) except Exception as e: - logger.error( - f"Error creating .mcaddon package for addon {addon_id}: {e}", exc_info=True - ) + logger.error(f"Error creating .mcaddon package for addon {addon_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to export addon: {str(e)}") # Sanitize addon name for filename diff --git a/backend/src/services/ai_engine_client.py b/backend/src/services/ai_engine_client.py index c0c2ea63..48ab6211 100644 --- a/backend/src/services/ai_engine_client.py +++ b/backend/src/services/ai_engine_client.py @@ -20,9 +20,7 @@ # AI Engine configuration AI_ENGINE_URL = os.getenv("AI_ENGINE_URL", "http://ai-engine:8001") -AI_ENGINE_TIMEOUT = httpx.Timeout( - 1800.0 -) # 30 minutes timeout for long-running conversions +AI_ENGINE_TIMEOUT = httpx.Timeout(1800.0) # 30 minutes timeout for long-running conversions # Default poll interval for checking conversion status DEFAULT_POLL_INTERVAL = 2.0 # seconds diff --git a/backend/src/services/structured_logging.py b/backend/src/services/structured_logging.py index 85abb1a8..dc1ff56a 100644 --- a/backend/src/services/structured_logging.py +++ b/backend/src/services/structured_logging.py @@ -19,9 +19,7 @@ from structlog.stdlib import ProcessorFormatter # Context variable to store correlation ID across async operations -correlation_id_var: ContextVar[Optional[str]] = ContextVar( - "correlation_id", default=None -) +correlation_id_var: ContextVar[Optional[str]] = ContextVar("correlation_id", default=None) # Context variable to store request metadata request_metadata_var: ContextVar[Optional[Dict[str, Any]]] = ContextVar( @@ -310,9 +308,7 @@ def __enter__(self): # Bind to structlog context structlog.contextvars.clear_contextvars() - structlog.contextvars.bind_contextvars( - correlation_id=self.correlation_id, **self.metadata - ) + structlog.contextvars.bind_contextvars(correlation_id=self.correlation_id, **self.metadata) return self From fd7e94b26228522621f8fb7344c4a2639da09459 Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 8 Mar 2026 19:58:59 -0400 Subject: [PATCH 6/7] fix: Pin opentelemetry-exporter-jaeger to 1.21.0 for Python 3.11 compatibility Latest version 1.24.0+ requires Python <3.11. This fixes the Prepare Base Images CI failure. --- ai-engine/requirements.txt | 3 ++- backend/requirements.txt | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/ai-engine/requirements.txt b/ai-engine/requirements.txt index b96a5b2d..1e7a5338 100644 --- a/ai-engine/requirements.txt +++ b/ai-engine/requirements.txt @@ -52,10 +52,11 @@ psutil structlog>=24.0.0 # Distributed Tracing - OpenTelemetry +# Note: opentelemetry-exporter-jaeger 1.21.0 is the latest version compatible with Python 3.11 opentelemetry-api>=1.24.0 opentelemetry-sdk>=1.24.0 opentelemetry-exporter-otlp>=1.24.0 -opentelemetry-exporter-jaeger>=1.24.0 +opentelemetry-exporter-jaeger==1.21.0 opentelemetry-instrumentation-fastapi>=0.45b0 opentelemetry-instrumentation-httpx>=0.45b0 opentelemetry-instrumentation-redis>=0.45b0 \ No newline at end of file diff --git a/backend/requirements.txt b/backend/requirements.txt index b260824b..d9dc5e73 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -34,10 +34,11 @@ sentry-sdk[fastapi]>=2.0.0 structlog>=24.0.0 # Distributed Tracing - OpenTelemetry +# Note: opentelemetry-exporter-jaeger 1.21.0 is the latest version compatible with Python 3.11 opentelemetry-api>=1.24.0 opentelemetry-sdk>=1.24.0 opentelemetry-exporter-otlp>=1.24.0 -opentelemetry-exporter-jaeger>=1.24.0 +opentelemetry-exporter-jaeger==1.21.0 opentelemetry-instrumentation-fastapi>=0.45b0 opentelemetry-instrumentation-httpx>=0.45b0 opentelemetry-instrumentation-redis>=0.45b0 From 1efbe4e15a3924edb192eb735c8d278d1ee76cdf Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 8 Mar 2026 20:29:49 -0400 Subject: [PATCH 7/7] fix(deps): pin redis<6.0.0 to resolve pip dependency resolution error --- ai-engine/requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ai-engine/requirements.txt b/ai-engine/requirements.txt index 1e7a5338..41cbf971 100644 --- a/ai-engine/requirements.txt +++ b/ai-engine/requirements.txt @@ -40,7 +40,8 @@ validators>=0.22.0 # For URL validation (BedrockDocsScraper) ddgs>=5.3.0 # For actual web search functionality (replaces duckduckgo-search) # Redis for job management -redis>=4.5.0 +# Pin redis to avoid resolution issues with complex dependency graph +redis>=4.5.0,<6.0.0 # Configuration Management pydantic>=2.0.0