diff --git a/.factory/tasks.md b/.factory/tasks.md index 3974872e..d32de48a 100644 --- a/.factory/tasks.md +++ b/.factory/tasks.md @@ -4,10 +4,14 @@ - None ## Pending -- None +- Fix PR #775 - distributed-tracing (Format Check fixed, Frontend Tests pre-existing) +- Fix PR #760 - detect-unused-dependencies (multiple CI failures) ## Completed -- None +- ✅ Fix PR #774 - health-check-endpoints (jaeger package fix pushed) +- ✅ Fix PR #775 Format Check (tracing files formatted and pushed) +- ✅ Analyze PR #775 Frontend Tests (pre-existing failures on main branch) +- ✅ Create Issue #776 for pre-existing frontend test failures --- -*Last updated: 2025-11-10* +*Last updated: 2026-03-09* diff --git a/ai-engine/main.py b/ai-engine/main.py index f20a88bb..d2276ed0 100644 --- a/ai-engine/main.py +++ b/ai-engine/main.py @@ -27,12 +27,12 @@ # Also configure structlog for structured JSON logging in production configure_structlog( debug_mode=debug_mode, - json_format=os.getenv("LOG_JSON_FORMAT", "false").lower() == "true" + json_format=os.getenv("LOG_JSON_FORMAT", "false").lower() == "true", ) setup_logging( debug_mode=debug_mode, - enable_file_logging=os.getenv("ENABLE_FILE_LOGGING", "true").lower() == "true" + enable_file_logging=os.getenv("ENABLE_FILE_LOGGING", "true").lower() == "true", ) logger = get_agent_logger("main") @@ -43,7 +43,11 @@ # Import progress callback for real-time updates try: - from utils.progress_callback import create_progress_callback, cleanup_progress_callback + from utils.progress_callback import ( + create_progress_callback, + cleanup_progress_callback, + ) + PROGRESS_CALLBACK_AVAILABLE = True except ImportError: PROGRESS_CALLBACK_AVAILABLE = False @@ -59,14 +63,16 @@ if debug_mode: print_gpu_info() + # Status enumeration for conversion states class ConversionStatusEnum(str, Enum): QUEUED = "queued" - IN_PROGRESS = "in_progress" + IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" + # FastAPI app configuration app = FastAPI( title="ModPorter AI Engine", @@ -84,7 +90,9 @@ class ConversionStatusEnum(str, Enum): ) # CORS middleware - Restrict origins for security -allowed_origins = os.getenv("ALLOWED_ORIGINS", "http://localhost:3000,http://localhost:8080").split(",") +allowed_origins = os.getenv( + "ALLOWED_ORIGINS", "http://localhost:3000,http://localhost:8080" +).split(",") app.add_middleware( CORSMiddleware, allow_origins=allowed_origins, @@ -98,55 +106,72 @@ class ConversionStatusEnum(str, Enum): assumption_engine = None redis_client = None + # Redis job state management class RedisJobManager: def __init__(self, redis_client): self.redis = redis_client self.available = True - + async def set_job_status(self, job_id: str, status: "ConversionStatus") -> None: """Store job status in Redis with error handling""" try: if not self.available: - raise HTTPException(status_code=503, detail="Job state storage unavailable") - + raise HTTPException( + status_code=503, detail="Job state storage unavailable" + ) + status_dict = status.model_dump() - status_dict['started_at'] = status_dict['started_at'].isoformat() if status_dict['started_at'] else None - status_dict['completed_at'] = status_dict['completed_at'].isoformat() if status_dict['completed_at'] else None - + status_dict["started_at"] = ( + status_dict["started_at"].isoformat() + if status_dict["started_at"] + else None + ) + status_dict["completed_at"] = ( + status_dict["completed_at"].isoformat() + if status_dict["completed_at"] + else None + ) + await self.redis.set( - f"ai_engine:jobs:{job_id}", + f"ai_engine:jobs:{job_id}", json.dumps(status_dict), - ex=3600 # Expire after 1 hour + ex=3600, # Expire after 1 hour ) except Exception as e: logger.error(f"Failed to store job status in Redis: {e}", exc_info=True) self.available = False raise HTTPException(status_code=503, detail="Job state storage failed") - + async def get_job_status(self, job_id: str) -> Optional["ConversionStatus"]: """Retrieve job status from Redis with error handling""" try: if not self.available: return None - + data = await self.redis.get(f"ai_engine:jobs:{job_id}") if not data: return None - + status_dict = json.loads(data) # Convert ISO strings back to datetime - if status_dict.get('started_at'): - status_dict['started_at'] = datetime.fromisoformat(status_dict['started_at']) - if status_dict.get('completed_at'): - status_dict['completed_at'] = datetime.fromisoformat(status_dict['completed_at']) - + if status_dict.get("started_at"): + status_dict["started_at"] = datetime.fromisoformat( + status_dict["started_at"] + ) + if status_dict.get("completed_at"): + status_dict["completed_at"] = datetime.fromisoformat( + status_dict["completed_at"] + ) + return ConversionStatus(**status_dict) except Exception as e: - logger.error(f"Failed to retrieve job status from Redis: {e}", exc_info=True) + logger.error( + f"Failed to retrieve job status from Redis: {e}", exc_info=True + ) self.available = False return None - + async def delete_job(self, job_id: str) -> None: """Remove job from Redis""" try: @@ -155,32 +180,45 @@ async def delete_job(self, job_id: str) -> None: except Exception as e: logger.error(f"Failed to delete job from Redis: {e}", exc_info=True) + job_manager = None + # Pydantic models class HealthResponse(BaseModel): """Health check response model""" + status: str version: str timestamp: str services: Dict[str, str] + class ConversionRequest(BaseModel): """Conversion request model""" + job_id: str = Field(..., description="Unique job identifier") mod_file_path: str = Field(..., description="Path to the mod file") - conversion_options: Optional[Dict[str, Any]] = Field(default={}, description="Conversion options") - experiment_variant: Optional[str] = Field(default=None, description="Experiment variant ID for A/B testing") + conversion_options: Optional[Dict[str, Any]] = Field( + default={}, description="Conversion options" + ) + experiment_variant: Optional[str] = Field( + default=None, description="Experiment variant ID for A/B testing" + ) + class ConversionResponse(BaseModel): """Conversion response model""" + job_id: str status: str message: str estimated_time: Optional[int] = None + class ConversionStatus(BaseModel): """Conversion status model""" + job_id: str status: str progress: int @@ -189,70 +227,73 @@ class ConversionStatus(BaseModel): started_at: Optional[datetime] = None completed_at: Optional[datetime] = None + # Job storage is now handled by RedisJobManager - no global dict + @app.on_event("startup") async def startup_event(): """Initialize services on startup""" global conversion_crew, assumption_engine, redis_client, job_manager - + logger.info("Starting ModPorter AI Engine...") - + try: # Initialize Redis connection redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") redis_client = aioredis.from_url(redis_url, decode_responses=True) - + # Test Redis connection await redis_client.ping() logger.info("Redis connection established") - + # Initialize job manager job_manager = RedisJobManager(redis_client) logger.info("RedisJobManager initialized") - + # Initialize SmartAssumptionEngine assumption_engine = SmartAssumptionEngine() logger.info("SmartAssumptionEngine initialized") - + # Note: We now initialize the conversion crew per request to support variants # The global conversion_crew will remain None - + logger.info("ModPorter AI Engine startup complete") - + except Exception as e: logger.error(f"Failed to initialize AI Engine: {e}", exc_info=True) raise HTTPException(status_code=503, detail="Service initialization failed") + @app.get("/api/v1/health", response_model=HealthResponse, tags=["health"]) async def health_check(): """Check the health status of the AI Engine""" services = { "assumption_engine": "healthy" if assumption_engine else "unavailable", } - + # Conversion crew is now initialized per request, so we don't check it here - + return HealthResponse( status="healthy", version="1.0.0", timestamp=datetime.utcnow().isoformat(), - services=services + services=services, ) + @app.post("/api/v1/convert", response_model=ConversionResponse, tags=["conversion"]) async def start_conversion( - request: ConversionRequest, - background_tasks: BackgroundTasks + request: ConversionRequest, background_tasks: BackgroundTasks ): """Start a new mod conversion job""" - + if not job_manager or not job_manager.available: raise HTTPException(status_code=503, detail="Job state storage unavailable") - + # Initialize conversion crew with variant if specified # The crew is now initialized in the background task to avoid blocking the request - + # Create job status job_status = ConversionStatus( job_id=request.job_id, @@ -260,75 +301,89 @@ async def start_conversion( progress=0, current_stage="initialization", message="Conversion job queued", - started_at=datetime.utcnow() + started_at=datetime.utcnow(), ) - + # Store in Redis instead of global dict await job_manager.set_job_status(request.job_id, job_status) - + # Start conversion in background background_tasks.add_task( process_conversion, request.job_id, request.mod_file_path, request.conversion_options, - request.experiment_variant # Pass variant to process_conversion + request.experiment_variant, # Pass variant to process_conversion ) - + logger.info(f"Started conversion job {request.job_id}") - + return ConversionResponse( job_id=request.job_id, status="queued", message="Conversion job started", - estimated_time=120 # Placeholder - would be calculated based on mod size + estimated_time=120, # Placeholder - would be calculated based on mod size ) -@app.get("/api/v1/status/{job_id}", response_model=ConversionStatus, tags=["conversion"]) + +@app.get( + "/api/v1/status/{job_id}", response_model=ConversionStatus, tags=["conversion"] +) async def get_conversion_status(job_id: str): """Get the status of a conversion job""" - + if not job_manager: raise HTTPException(status_code=503, detail="Job state storage unavailable") - + job_status = await job_manager.get_job_status(job_id) if not job_status: raise HTTPException(status_code=404, detail="Job not found") - + return job_status + @app.get("/api/v1/jobs", response_model=List[ConversionStatus], tags=["conversion"]) async def list_jobs(): """List all active conversion jobs""" if not job_manager or not job_manager.available: raise HTTPException(status_code=503, detail="Job state storage unavailable") - + # Note: In production, implement pagination and filtering # For now, return empty list as Redis doesn't have easy "list all" without keys - logger.warning("list_jobs endpoint returns empty - implement Redis SCAN for production") + logger.warning( + "list_jobs endpoint returns empty - implement Redis SCAN for production" + ) return [] -async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, Any], experiment_variant: Optional[str] = None): + +async def process_conversion( + job_id: str, + mod_file_path: str, + options: Dict[str, Any], + experiment_variant: Optional[str] = None, +): """Process a conversion job using the AI crew""" - + progress_callback = None - + try: # Get current job status job_status = await job_manager.get_job_status(job_id) if not job_status: logger.error(f"Job {job_id} not found during processing") return - + # Update job status job_status.status = "processing" job_status.current_stage = "analysis" job_status.message = "Analyzing mod structure" job_status.progress = 10 await job_manager.set_job_status(job_id, job_status) - - logger.info(f"Processing conversion for job {job_id} with variant {experiment_variant}") - + + logger.info( + f"Processing conversion for job {job_id} with variant {experiment_variant}" + ) + # Create progress callback for real-time updates if available if PROGRESS_CALLBACK_AVAILABLE and create_progress_callback: try: @@ -336,25 +391,29 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, logger.info(f"Progress callback created for job {job_id}") except Exception as e: logger.warning(f"Failed to create progress callback: {e}") - + # Prepare output path output_path = options.get("output_path") if not output_path: # Default output path using job_id pattern that backend expects # Use the mounted volume path inside the container - output_path = os.path.join(os.getenv("CONVERSION_OUTPUT_DIR", "/app/conversion_outputs"), f"{job_id}_converted.mcaddon") - + output_path = os.path.join( + os.getenv("CONVERSION_OUTPUT_DIR", "/app/conversion_outputs"), + f"{job_id}_converted.mcaddon", + ) + # Ensure the output directory exists os.makedirs(os.path.dirname(output_path), exist_ok=True) - + try: # Initialize conversion crew with variant if specified and pass progress callback crew = ModPorterConversionCrew( - variant_id=experiment_variant, - progress_callback=progress_callback + variant_id=experiment_variant, progress_callback=progress_callback + ) + logger.info( + f"ModPorterConversionCrew initialized with variant: {experiment_variant}" ) - logger.info(f"ModPorterConversionCrew initialized with variant: {experiment_variant}") - + # Update status for analysis stage job_status = await job_manager.get_job_status(job_id) if job_status: @@ -362,16 +421,17 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, job_status.message = "Analyzing Java mod structure" job_status.progress = 20 await job_manager.set_job_status(job_id, job_status) - + # Execute the actual AI conversion using the conversion crew from pathlib import Path + conversion_result = crew.convert_mod( mod_path=Path(mod_file_path), output_path=Path(output_path), smart_assumptions=options.get("smart_assumptions", True), - include_dependencies=options.get("include_dependencies", True) + include_dependencies=options.get("include_dependencies", True), ) - + # Update progress based on conversion result if conversion_result.get("status") == "failed": # Mark job as failed @@ -380,9 +440,11 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, job_status.status = "failed" job_status.message = f"Conversion failed: {conversion_result.get('error', 'Unknown error')}" await job_manager.set_job_status(job_id, job_status) - logger.error(f"Conversion failed for job {job_id}: {conversion_result.get('error')}") + logger.error( + f"Conversion failed for job {job_id}: {conversion_result.get('error')}" + ) return - + # Update progress through conversion stages stages = [ ("planning", "Creating conversion plan", 40), @@ -391,7 +453,7 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, ("packaging", "Packaging Bedrock addon", 90), ("validation", "Validating conversion", 95), ] - + for stage, message, progress in stages: job_status = await job_manager.get_job_status(job_id) if job_status: @@ -399,16 +461,21 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, job_status.message = message job_status.progress = progress await job_manager.set_job_status(job_id, job_status) - + # Short delay to show progress import asyncio + await asyncio.sleep(0.5) - + # Verify output file was created if not os.path.exists(output_path): - logger.error(f"Output file not created by conversion crew: {output_path}") - logger.error("This indicates a serious conversion failure that should not be masked") - + logger.error( + f"Output file not created by conversion crew: {output_path}" + ) + logger.error( + "This indicates a serious conversion failure that should not be masked" + ) + # Mark job as failed explicitly instead of creating a fake successful output job_status = await job_manager.get_job_status(job_id) if job_status: @@ -416,9 +483,9 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, job_status.message = "Conversion crew failed to produce output file - this indicates a serious error in the conversion process" await job_manager.set_job_status(job_id, job_status) return - + logger.info(f"Conversion completed successfully: {output_path}") - + except Exception as conversion_error: logger.error(f"Failed to convert mod {mod_file_path}: {conversion_error}") # Mark job as failed if conversion fails @@ -436,12 +503,12 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, job_status.message = "Conversion completed successfully" job_status.completed_at = datetime.utcnow() await job_manager.set_job_status(job_id, job_status) - + logger.info(f"Completed conversion for job {job_id}") - + except Exception as e: logger.error(f"Conversion failed for job {job_id}: {e}", exc_info=True) - + # Update job status to failed job_status = await job_manager.get_job_status(job_id) if job_status: @@ -450,20 +517,28 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, try: await job_manager.set_job_status(job_id, job_status) except Exception as status_error: - logger.error(f"Failed to update job status after error: {status_error}", exc_info=True) + logger.error( + f"Failed to update job status after error: {status_error}", + exc_info=True, + ) finally: # Clean up progress callback - if progress_callback and PROGRESS_CALLBACK_AVAILABLE and cleanup_progress_callback: + if ( + progress_callback + and PROGRESS_CALLBACK_AVAILABLE + and cleanup_progress_callback + ): try: await cleanup_progress_callback(job_id) logger.info(f"Cleaned up progress callback for job {job_id}") except Exception as e: logger.warning(f"Failed to cleanup progress callback: {e}") + if __name__ == "__main__": uvicorn.run( "main:app", host=os.getenv("HOST", "0.0.0.0"), port=int(os.getenv("PORT", 8001)), - reload=os.getenv("DEBUG", "false").lower() == "true" + reload=os.getenv("DEBUG", "false").lower() == "true", ) diff --git a/ai-engine/tracing.py b/ai-engine/tracing.py new file mode 100644 index 00000000..d56a9009 --- /dev/null +++ b/ai-engine/tracing.py @@ -0,0 +1,320 @@ +""" +Distributed Tracing Service using OpenTelemetry for AI Engine. + +This module provides tracing capabilities for the AI Engine service, +including trace context propagation between services. +""" + +import os +from typing import Optional +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.jaeger.thrift import JaegerExporter +from opentelemetry.sdk.extension.aws.resource.ec2 import AwsEc2ResourceDetector +from opentelemetry.sdk.extension.aws.resource.ecs import AwsEcsResourceDetector +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.trace import Status, StatusCode +from opentelemetry.context import Context +import logging + +logger = logging.getLogger(__name__) + +# Trace context propagator (W3C Trace Context) +tracer_propagator = TraceContextTextMapPropagator() + +# Global tracer instance +_tracer: Optional[trace.Tracer] = None +_tracer_provider: Optional[TracerProvider] = None + + +def get_tracer(service_name: str = "modporter-ai-engine") -> trace.Tracer: + """ + Get or create a tracer instance for the given service. + + Args: + service_name: Name of the service for tracing + + Returns: + Configured tracer instance + """ + global _tracer, _tracer_provider + + if _tracer is not None: + return _tracer + + # Create resource with service information + service_version = os.getenv("SERVICE_VERSION", "1.0.0") + resource = Resource.create( + { + SERVICE_NAME: service_name, + SERVICE_VERSION: service_version, + } + ) + + # Add cloud metadata if available + try: + ec2_resource = AwsEc2ResourceDetector().detect() + resource = resource.merge(ec2_resource) + except Exception: + pass + + try: + ecs_resource = AwsEcsResourceDetector().detect() + resource = resource.merge(ecs_resource) + except Exception: + pass + + # Create tracer provider + _tracer_provider = TracerProvider(resource=resource) + + # Configure exporter based on environment + tracing_enabled = os.getenv("TRACING_ENABLED", "true").lower() == "true" + tracing_exporter = os.getenv("TRACING_EXPORTER", "jaeger").lower() + + if tracing_enabled: + if tracing_exporter == "jaeger": + # Jaeger exporter configuration + jaeger_host = os.getenv("JAEGER_HOST", "localhost") + jaeger_port = int(os.getenv("JAEGER_PORT", "6831")) + + jaeger_exporter = JaegerExporter( + agent_host_name=jaeger_host, + agent_port=jaeger_port, + ) + _tracer_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter)) + logger.info(f"Jaeger tracing enabled: {jaeger_host}:{jaeger_port}") + + elif tracing_exporter == "otlp": + # OTLP exporter configuration + otlp_endpoint = os.getenv("OTLP_ENDPOINT", "http://localhost:4317") + + otlp_exporter = OTLPSpanExporter( + endpoint=otlp_endpoint, + insecure=True, + ) + _tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter)) + logger.info(f"OTLP tracing enabled: {otlp_endpoint}") + + # Add console exporter for development + if os.getenv("TRACING_CONSOLE", "false").lower() == "true": + _tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) + logger.info("Console span exporter enabled") + + # Set the global tracer provider + trace.set_tracer_provider(_tracer_provider) + + # Create and return tracer + _tracer = trace.get_tracer(service_name) + + logger.info(f"Tracing initialized for service: {service_name}") + + return _tracer + + +def init_tracing( + app=None, + service_name: str = "modporter-ai-engine", + instrument_fastapi: bool = True, + instrument_httpx: bool = True, + instrument_redis: bool = True, +) -> trace.Tracer: + """ + Initialize tracing with automatic instrumentation. + + Args: + app: FastAPI application instance (optional) + service_name: Name of the service + instrument_fastapi: Whether to instrument FastAPI + instrument_httpx: Whether to instrument HTTPX + instrument_redis: Whether to instrument Redis + + Returns: + Configured tracer instance + """ + tracer = get_tracer(service_name) + + # Instrument FastAPI if app provided + if app and instrument_fastapi: + try: + FastAPIInstrumentor.instrument_app(app) + logger.info("FastAPI instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument FastAPI: {e}") + + # Instrument HTTPX + if instrument_httpx: + try: + HTTPXClientInstrumentor().instrument() + logger.info("HTTPX instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument HTTPX: {e}") + + # Instrument Redis + if instrument_redis: + try: + RedisInstrumentor().instrument() + logger.info("Redis instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument Redis: {e}") + + return tracer + + +def extract_trace_context(carrier: dict) -> Context: + """ + Extract trace context from carrier (e.g., HTTP headers). + + Args: + carrier: Dictionary containing trace context (e.g., HTTP headers) + + Returns: + Extracted context + """ + return tracer_propagator.extract(carrier) + + +def inject_trace_context(carrier: dict) -> dict: + """ + Inject trace context into carrier (e.g., HTTP headers). + + Args: + carrier: Dictionary to inject trace context into + + Returns: + Carrier with injected trace context + """ + tracer_propagator.inject(carrier) + return carrier + + +def create_span( + name: str, + context: Optional[Context] = None, + kind: trace.SpanKind = trace.SpanKind.INTERNAL, +) -> trace.Span: + """ + Create a new span with the given name and context. + + Args: + name: Name of the span + context: Parent context (optional) + kind: Span kind + + Returns: + New span + """ + tracer = get_tracer() + + if context: + with tracer.start_as_current_span(name, context=context, kind=kind) as span: + return span + else: + with tracer.start_as_current_span(name, kind=kind) as span: + return span + + +def add_span_attributes(span: trace.Span, attributes: dict) -> None: + """ + Add attributes to a span. + + Args: + span: Span to add attributes to + attributes: Dictionary of attributes + """ + for key, value in attributes.items(): + if value is not None: + span.set_attribute(key, str(value)) + + +def record_span_exception(span: trace.Span, exception: Exception) -> None: + """ + Record an exception on a span. + + Args: + span: Span to record exception on + exception: Exception to record + """ + span.set_status(Status(StatusCode.ERROR, str(exception))) + span.record_exception(exception) + + +def shutdown_tracing() -> None: + """Shutdown the tracing provider and flush any pending spans.""" + global _tracer_provider + + if _tracer_provider: + _tracer_provider.shutdown() + logger.info("Tracing provider shutdown") + + +class TracingMiddleware: + """ + Middleware for FastAPI to handle trace context propagation. + + This middleware extracts trace context from incoming requests + and injects it into outgoing requests. + """ + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + # Extract trace context from headers + headers = dict(scope.get("headers", [])) + # Convert bytes to string for headers + headers = {k.decode(): v.decode() for k, v in headers.items()} + + # The FastAPI instrumentation will handle this automatically, + # but we keep this for custom use cases + context = extract_trace_context(headers) + + # Continue with the request + await self.app(scope, receive, send) + + +def get_current_span() -> Optional[trace.Span]: + """ + Get the current active span if any. + + Returns: + Current span or None + """ + return trace.get_current_span() + + +def get_trace_id() -> Optional[str]: + """ + Get the current trace ID as a hex string. + + Returns: + Trace ID or None + """ + span = get_current_span() + if span: + trace_id = span.get_span_context().trace_id + return format(trace_id, "032x") + return None + + +def get_span_id() -> Optional[str]: + """ + Get the current span ID as a hex string. + + Returns: + Span ID or None + """ + span = get_current_span() + if span: + span_id = span.get_span_context().span_id + return format(span_id, "016x") + return None diff --git a/ai-engine/utils/logging_config.py b/ai-engine/utils/logging_config.py index 713c5c0e..0a9c2d0e 100644 --- a/ai-engine/utils/logging_config.py +++ b/ai-engine/utils/logging_config.py @@ -27,7 +27,9 @@ import json # Context variable for correlation ID -correlation_id_var: ContextVar[Optional[str]] = ContextVar("correlation_id", default=None) +correlation_id_var: ContextVar[Optional[str]] = ContextVar( + "correlation_id", default=None +) def configure_structlog( @@ -38,7 +40,7 @@ def configure_structlog( ): """ Configure structlog for the AI engine. - + Args: log_level: Logging level (DEBUG, INFO, WARNING, ERROR) log_file: Path to log file (optional) @@ -47,16 +49,16 @@ def configure_structlog( """ if log_level is None: log_level = os.getenv("LOG_LEVEL", "INFO").upper() - + # Auto-detect JSON format in production if json_format is None: json_format = os.getenv("LOG_JSON_FORMAT", "false").lower() == "true" if os.getenv("ENVIRONMENT", "development") == "production": json_format = True - + # Get log directory log_dir = os.getenv("LOG_DIR", "/tmp/modporter-ai/logs") - + # Configure processors based on format processors = [ structlog.contextvars.merge_contextvars, @@ -65,18 +67,18 @@ def configure_structlog( structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), ] - + if debug_mode: processors.append(structlog.dev.ConsoleRenderer()) elif json_format: processors.append(structlog.processors.JSONRenderer()) else: processors.append(structlog.dev.ConsoleRenderer(colors=False)) - + # Add exception info processor processors.append(structlog.processors.StackInfoRenderer()) processors.append(structlog.processors.format_exc_info) - + # Configure structlog structlog.configure( processors=processors, @@ -85,160 +87,174 @@ def configure_structlog( logger_factory=structlog.stdlib.LoggerFactory(), cache_logger_on_first_use=True, ) - + # Also configure standard library logging root_logger = logging.getLogger() root_logger.setLevel(getattr(logging, log_level, logging.INFO)) - + # Clear existing handlers root_logger.handlers.clear() - + # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(getattr(logging, log_level, logging.INFO)) - console_handler.setFormatter(logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S" - )) + console_handler.setFormatter( + logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + ) root_logger.addHandler(console_handler) - + # File handler for production if log_file is None: os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, "ai-engine.log") - + file_handler = logging.handlers.RotatingFileHandler( - log_file, - maxBytes=10 * 1024 * 1024, # 10MB - backupCount=5, - encoding='utf-8' + log_file, maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8" # 10MB ) file_handler.setLevel(logging.INFO) - file_handler.setFormatter(logging.Formatter( - "%(message)s" - )) + file_handler.setFormatter(logging.Formatter("%(message)s")) root_logger.addHandler(file_handler) - + return structlog.get_logger() class AgentLogFormatter(logging.Formatter): """Custom formatter for agent logging with structured output""" - + def __init__(self, include_agent_context: bool = True): self.include_agent_context = include_agent_context super().__init__() - + def format(self, record: logging.LogRecord) -> str: # Base format with timestamp, level, and logger name - timestamp = datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] - + timestamp = datetime.fromtimestamp(record.created).strftime( + "%Y-%m-%d %H:%M:%S.%f" + )[:-3] + # Extract agent name from logger name (e.g., 'agents.java_analyzer' -> 'JavaAnalyzer') - logger_parts = record.name.split('.') - if 'agents' in logger_parts: - agent_idx = logger_parts.index('agents') + logger_parts = record.name.split(".") + if "agents" in logger_parts: + agent_idx = logger_parts.index("agents") if agent_idx + 1 < len(logger_parts): - agent_name = logger_parts[agent_idx + 1].replace('_', '').title() + agent_name = logger_parts[agent_idx + 1].replace("_", "").title() else: - agent_name = 'Agent' - elif 'crew' in logger_parts: - agent_name = 'Crew' + agent_name = "Agent" + elif "crew" in logger_parts: + agent_name = "Crew" else: - agent_name = record.name.split('.')[-1].title() - + agent_name = record.name.split(".")[-1].title() + # Build the log message - base_msg = f"{timestamp} [{record.levelname}] {agent_name}: {record.getMessage()}" - + base_msg = ( + f"{timestamp} [{record.levelname}] {agent_name}: {record.getMessage()}" + ) + # Add extra context if available - if hasattr(record, 'agent_context') and self.include_agent_context: + if hasattr(record, "agent_context") and self.include_agent_context: context = record.agent_context if isinstance(context, dict): - context_str = json.dumps(context, separators=(',', ':')) + context_str = json.dumps(context, separators=(",", ":")) base_msg += f" | Context: {context_str}" - + # Add operation timing if available - if hasattr(record, 'operation_time'): + if hasattr(record, "operation_time"): base_msg += f" | Duration: {record.operation_time:.3f}s" - + # Add tool usage information if available - if hasattr(record, 'tool_name'): + if hasattr(record, "tool_name"): base_msg += f" | Tool: {record.tool_name}" - if hasattr(record, 'tool_result'): + if hasattr(record, "tool_result"): result_preview = str(record.tool_result)[:100] if len(str(record.tool_result)) > 100: result_preview += "..." base_msg += f" | Result: {result_preview}" - + return base_msg class AgentLogger: """Enhanced logger for agent operations with structured logging""" - + def __init__(self, name: str): self.logger = logging.getLogger(name) - self.agent_name = name.split('.')[-1].replace('_', '').title() - + self.agent_name = name.split(".")[-1].replace("_", "").title() + def info(self, message: str, **kwargs): """Log info message with optional context""" extra = self._build_extra(**kwargs) self.logger.info(message, extra=extra) - + def debug(self, message: str, **kwargs): """Log debug message with optional context""" extra = self._build_extra(**kwargs) self.logger.debug(message, extra=extra) - + def warning(self, message: str, **kwargs): """Log warning message with optional context""" extra = self._build_extra(**kwargs) self.logger.warning(message, extra=extra) - + def error(self, message: str, **kwargs): """Log error message with optional context""" extra = self._build_extra(**kwargs) self.logger.error(message, extra=extra) - + def log_operation_start(self, operation: str, **context): """Log the start of an operation""" self.info(f"Starting {operation}", agent_context=context) - + def log_operation_complete(self, operation: str, duration: float, **context): """Log the completion of an operation with timing""" - self.info(f"Completed {operation}", operation_time=duration, agent_context=context) - - def log_tool_usage(self, tool_name: str, result: Any = None, duration: Optional[float] = None): + self.info( + f"Completed {operation}", operation_time=duration, agent_context=context + ) + + def log_tool_usage( + self, tool_name: str, result: Any = None, duration: Optional[float] = None + ): """Log tool usage with results""" - kwargs = {'tool_name': tool_name} + kwargs = {"tool_name": tool_name} if result is not None: - kwargs['tool_result'] = result + kwargs["tool_result"] = result if duration is not None: - kwargs['operation_time'] = duration - + kwargs["operation_time"] = duration + self.info(f"Used tool: {tool_name}", **kwargs) - + def log_agent_decision(self, decision: str, reasoning: str, **context): """Log agent decision-making process""" log_context = context.copy() - log_context['decision'] = decision - log_context['reasoning'] = reasoning + log_context["decision"] = decision + log_context["reasoning"] = reasoning self.info(f"Decision: {decision}", agent_context=log_context) - - def log_data_transfer(self, from_agent: str, to_agent: str, data_type: str, data_size: Optional[int] = None): + + def log_data_transfer( + self, + from_agent: str, + to_agent: str, + data_type: str, + data_size: Optional[int] = None, + ): """Log data transfer between agents""" context = { - 'from_agent': from_agent, - 'to_agent': to_agent, - 'data_type': data_type + "from_agent": from_agent, + "to_agent": to_agent, + "data_type": data_type, } if data_size is not None: - context['data_size'] = data_size - - self.info(f"Data transfer: {data_type} from {from_agent} to {to_agent}", agent_context=context) - + context["data_size"] = data_size + + self.info( + f"Data transfer: {data_type} from {from_agent} to {to_agent}", + agent_context=context, + ) + def _build_extra(self, **kwargs) -> Dict[str, Any]: """Build extra dictionary for logging""" - valid_keys = {'agent_context', 'operation_time', 'tool_name', 'tool_result'} + valid_keys = {"agent_context", "operation_time", "tool_name", "tool_result"} extra = {key: value for key, value in kwargs.items() if key in valid_keys} return extra @@ -249,11 +265,11 @@ def setup_logging( enable_file_logging: bool = True, debug_mode: bool = False, max_file_size: int = 10 * 1024 * 1024, # 10MB - backup_count: int = 5 + backup_count: int = 5, ) -> None: """ Setup centralized logging configuration for the AI engine - + Args: log_level: Logging level (DEBUG, INFO, WARNING, ERROR) log_file: Path to log file (optional) @@ -262,30 +278,30 @@ def setup_logging( max_file_size: Maximum size of log file before rotation backup_count: Number of backup log files to keep """ - + # Determine log level if log_level is None: log_level = os.getenv("LOG_LEVEL", "DEBUG" if debug_mode else "INFO").upper() - + # Convert string level to logging constant numeric_level = getattr(logging, log_level, logging.INFO) - + # Create custom formatter formatter = AgentLogFormatter(include_agent_context=debug_mode) - + # Configure root logger root_logger = logging.getLogger() root_logger.setLevel(numeric_level) - + # Clear existing handlers root_logger.handlers.clear() - + # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(numeric_level) console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) - + # File handler (if enabled) if enable_file_logging: if log_file is None: @@ -293,36 +309,37 @@ def setup_logging( log_dir = Path(os.getenv("LOG_DIR", "/tmp/modporter-ai/logs")) log_dir.mkdir(parents=True, exist_ok=True) log_file = log_dir / "ai-engine.log" - + # Use rotating file handler to prevent huge log files file_handler = logging.handlers.RotatingFileHandler( - log_file, - maxBytes=max_file_size, - backupCount=backup_count, - encoding='utf-8' + log_file, maxBytes=max_file_size, backupCount=backup_count, encoding="utf-8" ) file_handler.setLevel(numeric_level) file_handler.setFormatter(formatter) root_logger.addHandler(file_handler) - + # Set specific logger levels for third-party libraries to reduce noise logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) - logging.getLogger("crewai").setLevel(logging.INFO if debug_mode else logging.WARNING) - + logging.getLogger("crewai").setLevel( + logging.INFO if debug_mode else logging.WARNING + ) + # Log the configuration logger = logging.getLogger(__name__) - logger.info(f"Logging configured - Level: {log_level}, Debug: {debug_mode}, File: {enable_file_logging}") + logger.info( + f"Logging configured - Level: {log_level}, Debug: {debug_mode}, File: {enable_file_logging}" + ) def get_agent_logger(agent_name: str) -> AgentLogger: """ Get a configured logger for an agent - + Args: agent_name: Name of the agent (e.g., 'java_analyzer', 'logic_translator') - + Returns: AgentLogger instance """ @@ -333,10 +350,10 @@ def get_agent_logger(agent_name: str) -> AgentLogger: def get_structlog_logger(name: str = None) -> structlog.BoundLogger: """ Get a structlog logger instance. - + Args: name: Logger name (optional) - + Returns: Configured structlog logger """ @@ -348,16 +365,16 @@ def get_structlog_logger(name: str = None) -> structlog.BoundLogger: def set_correlation_id(correlation_id: Optional[str] = None) -> str: """ Set the correlation ID for the current context. - + Args: correlation_id: Optional correlation ID to use - + Returns: The correlation ID (either provided or generated) """ if correlation_id is None: correlation_id = str(uuid.uuid4()) - + correlation_id_var.set(correlation_id) structlog.contextvars.clear_contextvars() structlog.contextvars.bind_contextvars(correlation_id=correlation_id) @@ -367,7 +384,7 @@ def set_correlation_id(correlation_id: Optional[str] = None) -> str: def get_correlation_id() -> Optional[str]: """ Get the current correlation ID from the context. - + Returns: Current correlation ID or None """ @@ -390,20 +407,21 @@ def get_crew_logger() -> AgentLogger: # Performance timing decorator def log_performance(operation_name: str = None): """Decorator to log operation performance""" + def decorator(func): def wrapper(*args, **kwargs): - + # Get logger from the class instance if available - if args and hasattr(args[0], 'logger'): + if args and hasattr(args[0], "logger"): logger = args[0].logger else: - logger = get_agent_logger(func.__module__.split('.')[-1]) - + logger = get_agent_logger(func.__module__.split(".")[-1]) + op_name = operation_name or func.__name__ start_time = time.time() - + logger.log_operation_start(op_name) - + try: result = func(*args, **kwargs) duration = time.time() - start_time @@ -411,10 +429,13 @@ def wrapper(*args, **kwargs): return result except Exception as e: duration = time.time() - start_time - logger.error(f"Operation {op_name} failed after {duration:.3f}s: {str(e)}") + logger.error( + f"Operation {op_name} failed after {duration:.3f}s: {str(e)}" + ) raise - + return wrapper + return decorator @@ -423,12 +444,15 @@ def wrapper(*args, **kwargs): # ============================================================================ # Context variable for tracking operation context across async boundaries -_operation_context: ContextVar[Dict[str, Any]] = ContextVar('operation_context', default={}) +_operation_context: ContextVar[Dict[str, Any]] = ContextVar( + "operation_context", default={} +) @dataclass class AgentDecision: """Represents an agent decision for logging and analysis""" + agent_name: str decision_type: str decision: str @@ -443,6 +467,7 @@ class AgentDecision: @dataclass class ToolUsage: """Represents tool usage for logging and analysis""" + agent_name: str tool_name: str input_params: Dict[str, Any] @@ -458,32 +483,32 @@ class AgentLogAnalyzer: Log analysis tools for agent operations. Provides insights into agent behavior, decisions, and performance. """ - + def __init__(self): self.decisions: List[AgentDecision] = [] self.tool_usages: List[ToolUsage] = [] self._lock = threading.Lock() - + def record_decision(self, decision: AgentDecision): """Record an agent decision for analysis""" with self._lock: self.decisions.append(decision) - + def record_tool_usage(self, usage: ToolUsage): """Record tool usage for analysis""" with self._lock: self.tool_usages.append(usage) - + def get_agent_statistics(self, agent_name: str = None) -> Dict[str, Any]: """Get statistics for a specific agent or all agents""" with self._lock: decisions = self.decisions tool_usages = self.tool_usages - + if agent_name: decisions = [d for d in decisions if d.agent_name == agent_name] tool_usages = [t for t in tool_usages if t.agent_name == agent_name] - + # Calculate decision statistics decision_types = defaultdict(int) avg_confidence = 0.0 @@ -492,79 +517,89 @@ def get_agent_statistics(self, agent_name: str = None) -> Dict[str, Any]: decision_types[d.decision_type] += 1 if d.confidence is not None: avg_confidence += d.confidence - avg_confidence /= len([d for d in decisions if d.confidence is not None]) if decisions else 1 - + avg_confidence /= ( + len([d for d in decisions if d.confidence is not None]) + if decisions + else 1 + ) + # Calculate tool usage statistics tool_counts = defaultdict(int) - tool_success_rate = defaultdict(lambda: {'success': 0, 'total': 0}) + tool_success_rate = defaultdict(lambda: {"success": 0, "total": 0}) avg_tool_duration = defaultdict(list) - + for t in tool_usages: tool_counts[t.tool_name] += 1 - tool_success_rate[t.tool_name]['total'] += 1 + tool_success_rate[t.tool_name]["total"] += 1 if t.success: - tool_success_rate[t.tool_name]['success'] += 1 + tool_success_rate[t.tool_name]["success"] += 1 avg_tool_duration[t.tool_name].append(t.duration_ms) - + # Calculate average durations avg_durations = {} for tool, durations in avg_tool_duration.items(): avg_durations[tool] = sum(durations) / len(durations) if durations else 0 - + # Calculate success rates success_rates = {} for tool, counts in tool_success_rate.items(): - success_rates[tool] = counts['success'] / counts['total'] if counts['total'] > 0 else 0 - + success_rates[tool] = ( + counts["success"] / counts["total"] if counts["total"] > 0 else 0 + ) + return { - 'agent_name': agent_name or 'all', - 'total_decisions': len(decisions), - 'total_tool_usages': len(tool_usages), - 'decision_types': dict(decision_types), - 'average_confidence': avg_confidence, - 'tool_usage_counts': dict(tool_counts), - 'tool_success_rates': success_rates, - 'average_tool_durations_ms': avg_durations, + "agent_name": agent_name or "all", + "total_decisions": len(decisions), + "total_tool_usages": len(tool_usages), + "decision_types": dict(decision_types), + "average_confidence": avg_confidence, + "tool_usage_counts": dict(tool_counts), + "tool_success_rates": success_rates, + "average_tool_durations_ms": avg_durations, } - - def get_decision_trace(self, agent_name: str = None, limit: int = 100) -> List[Dict]: + + def get_decision_trace( + self, agent_name: str = None, limit: int = 100 + ) -> List[Dict]: """Get a trace of recent decisions""" with self._lock: decisions = self.decisions.copy() - + if agent_name: decisions = [d for d in decisions if d.agent_name == agent_name] - + # Sort by timestamp (most recent first) and limit decisions = sorted(decisions, key=lambda d: d.timestamp, reverse=True)[:limit] - + return [asdict(d) for d in decisions] - - def get_tool_usage_trace(self, agent_name: str = None, limit: int = 100) -> List[Dict]: + + def get_tool_usage_trace( + self, agent_name: str = None, limit: int = 100 + ) -> List[Dict]: """Get a trace of recent tool usages""" with self._lock: usages = self.tool_usages.copy() - + if agent_name: usages = [u for u in usages if u.agent_name == agent_name] - + # Sort by timestamp (most recent first) and limit usages = sorted(usages, key=lambda u: u.timestamp, reverse=True)[:limit] - + return [asdict(u) for u in usages] - + def export_analysis(self, filepath: str): """Export analysis data to JSON file""" data = { - 'export_timestamp': datetime.now().isoformat(), - 'statistics': self.get_agent_statistics(), - 'decision_trace': self.get_decision_trace(limit=1000), - 'tool_usage_trace': self.get_tool_usage_trace(limit=1000), + "export_timestamp": datetime.now().isoformat(), + "statistics": self.get_agent_statistics(), + "decision_trace": self.get_decision_trace(limit=1000), + "tool_usage_trace": self.get_tool_usage_trace(limit=1000), } - - with open(filepath, 'w') as f: + + with open(filepath, "w") as f: json.dump(data, f, indent=2, default=str) - + def clear(self): """Clear all recorded data""" with self._lock: @@ -587,7 +622,7 @@ def get_log_analyzer() -> AgentLogAnalyzer: class EnhancedAgentLogger(AgentLogger): """ Enhanced agent logger with comprehensive logging capabilities. - + Features: - Structured logging with context - Decision and reasoning logging @@ -595,19 +630,21 @@ class EnhancedAgentLogger(AgentLogger): - Debug mode with verbose output - Integration with log analyzer """ - + def __init__(self, name: str, debug_mode: bool = False): super().__init__(name) - self._debug_mode = debug_mode or os.getenv("AGENT_DEBUG_MODE", "false").lower() == "true" + self._debug_mode = ( + debug_mode or os.getenv("AGENT_DEBUG_MODE", "false").lower() == "true" + ) self._analyzer = get_log_analyzer() self._operation_stack: List[Dict[str, Any]] = [] self._lock = threading.Lock() - + def set_debug_mode(self, enabled: bool): """Enable or disable debug mode""" self._debug_mode = enabled self.debug(f"Debug mode {'enabled' if enabled else 'disabled'}") - + def log_decision( self, decision_type: str, @@ -615,11 +652,11 @@ def log_decision( reasoning: str, confidence: float = None, alternatives: List[str] = None, - **context + **context, ): """ Log an agent decision with reasoning. - + Args: decision_type: Type of decision (e.g., 'feature_mapping', 'code_translation') decision: The decision made @@ -638,33 +675,31 @@ def log_decision( alternatives_considered=alternatives or [], context=context, ) - + # Record for analysis self._analyzer.record_decision(decision_record) - + # Log the decision log_context = { - 'decision_type': decision_type, - 'confidence': confidence, - 'alternatives_count': len(alternatives) if alternatives else 0, - **context + "decision_type": decision_type, + "confidence": confidence, + "alternatives_count": len(alternatives) if alternatives else 0, + **context, } - + if self._debug_mode: # Verbose output in debug mode self.info( - f"DECISION [{decision_type}]: {decision}", - agent_context=log_context + f"DECISION [{decision_type}]: {decision}", agent_context=log_context ) self.debug(f" Reasoning: {reasoning}") if alternatives: self.debug(f" Alternatives considered: {alternatives}") else: self.info( - f"Decision: {decision_type} -> {decision}", - agent_context=log_context + f"Decision: {decision_type} -> {decision}", agent_context=log_context ) - + def log_tool_call( self, tool_name: str, @@ -672,11 +707,11 @@ def log_tool_call( result: Any = None, success: bool = True, duration_ms: float = 0, - error: str = None + error: str = None, ): """ Log a tool call with input/output details. - + Args: tool_name: Name of the tool called input_params: Parameters passed to the tool @@ -690,40 +725,42 @@ def log_tool_call( agent_name=self.agent_name, tool_name=tool_name, input_params=input_params, - output_result=result if not isinstance(result, str) else result[:1000], # Truncate long strings + output_result=( + result if not isinstance(result, str) else result[:1000] + ), # Truncate long strings success=success, duration_ms=duration_ms, error_message=error, ) - + # Record for analysis self._analyzer.record_tool_usage(usage_record) - + # Log the tool call if success: self.info( f"Tool call: {tool_name} ({duration_ms:.1f}ms)", tool_name=tool_name, - operation_time=duration_ms / 1000 + operation_time=duration_ms / 1000, ) else: self.error( f"Tool call failed: {tool_name} - {error}", tool_name=tool_name, - operation_time=duration_ms / 1000 + operation_time=duration_ms / 1000, ) - + # Debug mode: log input/output details if self._debug_mode: self.debug(f" Tool input: {json.dumps(input_params, default=str)[:500]}") if success and result is not None: result_str = str(result)[:500] self.debug(f" Tool output: {result_str}") - + def log_reasoning_step(self, step: str, details: str = None): """ Log a reasoning step in the agent's thought process. - + Args: step: Name/description of the reasoning step details: Additional details about the step @@ -732,11 +769,13 @@ def log_reasoning_step(self, step: str, details: str = None): self.debug(f"Reasoning: {step}") if details: self.debug(f" Details: {details}") - - def log_state_change(self, state_name: str, old_value: Any, new_value: Any, reason: str = None): + + def log_state_change( + self, state_name: str, old_value: Any, new_value: Any, reason: str = None + ): """ Log a state change in the agent. - + Args: state_name: Name of the state variable old_value: Previous value @@ -744,89 +783,95 @@ def log_state_change(self, state_name: str, old_value: Any, new_value: Any, reas reason: Reason for the change """ context = { - 'state_name': state_name, - 'old_value': str(old_value)[:200], - 'new_value': str(new_value)[:200], + "state_name": state_name, + "old_value": str(old_value)[:200], + "new_value": str(new_value)[:200], } if reason: - context['reason'] = reason - + context["reason"] = reason + self.debug(f"State change: {state_name}", agent_context=context) - + def push_operation(self, operation: str, **context): """Push an operation onto the operation stack""" with self._lock: - self._operation_stack.append({ - 'operation': operation, - 'context': context, - 'start_time': time.time(), - }) + self._operation_stack.append( + { + "operation": operation, + "context": context, + "start_time": time.time(), + } + ) self.debug(f"Started operation: {operation}") - + def pop_operation(self, success: bool = True, error: str = None): """Pop an operation from the stack and log its completion""" with self._lock: if not self._operation_stack: self.warning("Attempted to pop operation from empty stack") return - + op = self._operation_stack.pop() - - duration = time.time() - op['start_time'] - operation = op['operation'] - + + duration = time.time() - op["start_time"] + operation = op["operation"] + if success: self.info(f"Completed operation: {operation} ({duration:.3f}s)") else: self.error(f"Failed operation: {operation} ({duration:.3f}s) - {error}") - + def get_current_operation(self) -> Optional[str]: """Get the current operation name""" with self._lock: if self._operation_stack: - return self._operation_stack[-1]['operation'] + return self._operation_stack[-1]["operation"] return None - + def log_error_with_trace(self, message: str, error: Exception = None): """ Log an error with full stack trace. - + Args: message: Error message error: Exception object (optional) """ self.error(message) - + if error and self._debug_mode: # Log full stack trace in debug mode trace = traceback.format_exc() self.debug(f"Stack trace:\n{trace}") - + def log_performance_metric(self, metric_name: str, value: float, unit: str = None): """ Log a performance metric. - + Args: metric_name: Name of the metric value: Metric value unit: Unit of measurement """ context = { - 'metric_name': metric_name, - 'metric_value': value, - 'metric_unit': unit, + "metric_name": metric_name, + "metric_value": value, + "metric_unit": unit, } - self.debug(f"Performance: {metric_name} = {value}{unit or ''}", agent_context=context) + self.debug( + f"Performance: {metric_name} = {value}{unit or ''}", agent_context=context + ) -def get_enhanced_agent_logger(agent_name: str, debug_mode: bool = False) -> EnhancedAgentLogger: +def get_enhanced_agent_logger( + agent_name: str, debug_mode: bool = False +) -> EnhancedAgentLogger: """ Get an enhanced agent logger with comprehensive logging capabilities. - + Args: agent_name: Name of the agent debug_mode: Enable debug mode for verbose output - + Returns: EnhancedAgentLogger instance """ @@ -847,31 +892,33 @@ def disable_global_debug_mode(): def export_log_analysis(filepath: str = None): """ Export log analysis to a JSON file. - + Args: filepath: Path to export file (optional, defaults to log directory) """ if filepath is None: log_dir = Path(os.getenv("LOG_DIR", "/tmp/modporter-ai/logs")) log_dir.mkdir(parents=True, exist_ok=True) - filepath = log_dir / f"agent_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" - + filepath = ( + log_dir / f"agent_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + ) + analyzer = get_log_analyzer() analyzer.export_analysis(str(filepath)) - + logger = logging.getLogger(__name__) logger.info(f"Log analysis exported to: {filepath}") - + return filepath def get_agent_log_summary(agent_name: str = None) -> Dict[str, Any]: """ Get a summary of agent logging activity. - + Args: agent_name: Specific agent name (optional, returns all if not specified) - + Returns: Dictionary with agent statistics """ diff --git a/backend/src/main.py b/backend/src/main.py index e49e0f3d..2b8e61a6 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -199,21 +199,31 @@ async def shutdown_event(): # Include API routers -app.include_router(performance.router, prefix="/api/v1/performance", tags=["performance"]) -app.include_router(behavioral_testing.router, prefix="/api/v1", tags=["behavioral-testing"]) +app.include_router( + performance.router, prefix="/api/v1/performance", tags=["performance"] +) +app.include_router( + behavioral_testing.router, prefix="/api/v1", tags=["behavioral-testing"] +) app.include_router(validation.router, prefix="/api/v1/validation", tags=["validation"]) app.include_router(comparison.router, prefix="/api/v1/comparison", tags=["comparison"]) app.include_router(embeddings.router, prefix="/api/v1/embeddings", tags=["embeddings"]) app.include_router(feedback.router, prefix="/api/v1", tags=["feedback"]) -app.include_router(experiments.router, prefix="/api/v1/experiments", tags=["experiments"]) +app.include_router( + experiments.router, prefix="/api/v1/experiments", tags=["experiments"] +) app.include_router(behavior_files.router, prefix="/api/v1", tags=["behavior-files"]) -app.include_router(behavior_templates.router, prefix="/api/v1", tags=["behavior-templates"]) +app.include_router( + behavior_templates.router, prefix="/api/v1", tags=["behavior-templates"] +) app.include_router(behavior_export.router, prefix="/api/v1", tags=["behavior-export"]) app.include_router(advanced_events.router, prefix="/api/v1", tags=["advanced-events"]) app.include_router(conversions.router) # Conversions API + WebSocket app.include_router(mod_imports.router, prefix="/api/v1/mods", tags=["mod-imports"]) app.include_router(analytics.router, prefix="/api/v1/analytics", tags=["analytics"]) -app.include_router(rate_limit_dashboard_router, prefix="/api/v1/rate-limit", tags=["rate-limiting"]) +app.include_router( + rate_limit_dashboard_router, prefix="/api/v1/rate-limit", tags=["rate-limiting"] +) # Health check endpoints (no prefix - used for Kubernetes probes) app.include_router(health.router) @@ -234,7 +244,9 @@ class ConversionRequest(BaseModel): target_version: str = Field( default="1.20.0", description="Target Minecraft version for the conversion." ) - options: Optional[dict] = Field(default=None, description="Optional conversion settings.") + options: Optional[dict] = Field( + default=None, description="Optional conversion settings." + ) @property def resolved_file_id(self) -> str: @@ -249,8 +261,12 @@ def resolved_original_name(self) -> str: class UploadResponse(BaseModel): """Response model for file upload""" - file_id: str = Field(..., description="Unique identifier assigned to the uploaded file.") - original_filename: str = Field(..., description="The original name of the uploaded file.") + file_id: str = Field( + ..., description="Unique identifier assigned to the uploaded file." + ) + original_filename: str = Field( + ..., description="The original name of the uploaded file." + ) saved_filename: str = Field( ..., description="The name under which the file is saved on the server (job_id + extension).", @@ -260,7 +276,9 @@ class UploadResponse(BaseModel): default=None, description="Detected content type of the uploaded file." ) message: str = Field(..., description="Status message confirming the upload.") - filename: str = Field(..., description="The uploaded filename (matches original_filename)") + filename: str = Field( + ..., description="The uploaded filename (matches original_filename)" + ) class ConversionResponse(BaseModel): @@ -415,12 +433,14 @@ async def simulate_ai_conversion(job_id: str): logger.error(f"Error: Job {job_id} not found for AI simulation.") return - original_mod_name = job.input_data.get("original_filename", "ConvertedAddon").split( - "." - )[0] + original_mod_name = job.input_data.get( + "original_filename", "ConvertedAddon" + ).split(".")[0] # Attempt to get user_id from job input_data, fall back to a default if not found # This field might not exist in older job records. - user_id_for_addon = job.input_data.get("user_id", conversion_parser.DEFAULT_USER_ID) + user_id_for_addon = job.input_data.get( + "user_id", conversion_parser.DEFAULT_USER_ID + ) def mirror_dict_from_job( current_job, progress_val=None, result_url=None, error_message=None @@ -433,7 +453,9 @@ def mirror_dict_from_job( progress=( progress_val if progress_val is not None - else (current_job.progress.progress if current_job.progress else 0) + else ( + current_job.progress.progress if current_job.progress else 0 + ) ), target_version=current_job.input_data.get("target_version"), options=current_job.input_data.get("options"), @@ -460,13 +482,17 @@ def mirror_dict_from_job( ) # Stage 2: Preprocessing -> Processing - job = await crud.update_job_status(session, PyUUID(job_id), "processing") + job = await crud.update_job_status( + session, PyUUID(job_id), "processing" + ) await crud.upsert_progress(session, PyUUID(job_id), 25) mirror = mirror_dict_from_job(job, 25) conversion_jobs_db[job_id] = mirror # Keep legacy mirror for now await cache.set_job_status(job_id, mirror.model_dump()) await cache.set_progress(job_id, 25) - logger.info(f"Job {job_id}: Status updated to {job.status}, Progress: 25%") + logger.info( + f"Job {job_id}: Status updated to {job.status}, Progress: 25%" + ) # Stage 3: BedrockArchitectAgent (25-50%) await ProgressHandler.broadcast_agent_start( @@ -492,13 +518,17 @@ def mirror_dict_from_job( if job.status == "cancelled": logger.info(f"Job {job_id} was cancelled. Stopping AI simulation.") return - job = await crud.update_job_status(session, PyUUID(job_id), "postprocessing") + job = await crud.update_job_status( + session, PyUUID(job_id), "postprocessing" + ) await crud.upsert_progress(session, PyUUID(job_id), 50) mirror = mirror_dict_from_job(job, 50) conversion_jobs_db[job_id] = mirror await cache.set_job_status(job_id, mirror.model_dump()) await cache.set_progress(job_id, 50) - logger.info(f"Job {job_id}: Status updated to {job.status}, Progress: 50%") + logger.info( + f"Job {job_id}: Status updated to {job.status}, Progress: 50%" + ) # Stage 5: LogicTranslatorAgent (50-75%) await ProgressHandler.broadcast_agent_start( @@ -617,7 +647,9 @@ def mirror_dict_from_job( f, ) # Dummy block behavior - with open(os.path.join(bp_dir, "blocks", "simulated_block.json"), "w") as f: + with open( + os.path.join(bp_dir, "blocks", "simulated_block.json"), "w" + ) as f: json.dump( { "minecraft:block": { @@ -630,7 +662,9 @@ def mirror_dict_from_job( f, ) # Dummy recipe - with open(os.path.join(bp_dir, "recipes", "simulated_recipe.json"), "w") as f: + with open( + os.path.join(bp_dir, "recipes", "simulated_recipe.json"), "w" + ) as f: json.dump( { "minecraft:recipe_shaped": { @@ -661,7 +695,9 @@ def mirror_dict_from_job( ) # Save Addon, Blocks, Recipes (assets list in addon_data_upload is empty) - await crud.update_addon_details(session, PyUUID(job_id), addon_data_upload) + await crud.update_addon_details( + session, PyUUID(job_id), addon_data_upload + ) logger.info( f"Job {job_id}: Addon core data (metadata, blocks, recipes) saved to DB." ) @@ -681,19 +717,27 @@ def mirror_dict_from_job( # Asset conversion integration - convert uploaded assets using AI engine try: - logger.info(f"Job {job_id}: Starting asset conversion for conversion job") + logger.info( + f"Job {job_id}: Starting asset conversion for conversion job" + ) asset_conversion_result = ( - await asset_conversion_service.convert_assets_for_conversion(job_id) + await asset_conversion_service.convert_assets_for_conversion( + job_id + ) ) if asset_conversion_result.get("success"): - converted_count = asset_conversion_result.get("converted_count", 0) + converted_count = asset_conversion_result.get( + "converted_count", 0 + ) failed_count = asset_conversion_result.get("failed_count", 0) logger.info( f"Job {job_id}: Asset conversion completed - {converted_count} converted, {failed_count} failed" ) else: - logger.warning(f"Job {job_id}: Asset conversion batch had issues") + logger.warning( + f"Job {job_id}: Asset conversion batch had issues" + ) except Exception as asset_error: logger.error(f"Job {job_id}: Asset conversion error: {asset_error}") @@ -701,7 +745,9 @@ def mirror_dict_from_job( # Original ZIP creation (can be retained or removed) os.makedirs(CONVERSION_OUTPUTS_DIR, exist_ok=True) - mock_output_filename_internal = f"{job.id}_converted.zip" # Original ZIP name + mock_output_filename_internal = ( + f"{job.id}_converted.zip" # Original ZIP name + ) mock_output_filepath = os.path.join( CONVERSION_OUTPUTS_DIR, mock_output_filename_internal ) @@ -713,7 +759,9 @@ def mirror_dict_from_job( "zip", simulated_pack_output_path, ) - logger.info(f"Job {job_id}: Original ZIP archive created at {mock_output_filepath}") + logger.info( + f"Job {job_id}: Original ZIP archive created at {mock_output_filepath}" + ) job = await crud.update_job_status(session, PyUUID(job_id), "completed") await crud.upsert_progress(session, PyUUID(job_id), 100) @@ -739,7 +787,9 @@ def mirror_dict_from_job( conversion_jobs_db[job_id] = mirror await cache.set_job_status(job_id, mirror.model_dump()) await cache.set_progress(job_id, 0) - logger.error(f"Job {job_id}: Status updated to FAILED due to error in processing.") + logger.error( + f"Job {job_id}: Status updated to FAILED due to error in processing." + ) # Broadcast failure to WebSocket clients await ProgressHandler.broadcast_conversion_failed(job_id, str(e_inner)) @@ -771,7 +821,9 @@ def mirror_dict_from_job( job_data_uuid_key = conversion_jobs_db[PyUUID(job_id)] job_data_uuid_key.status = "failed" # ... update other fields ... - await cache.set_job_status(str(PyUUID(job_id)), job_data_uuid_key.model_dump()) + await cache.set_job_status( + str(PyUUID(job_id)), job_data_uuid_key.model_dump() + ) except Exception as cache_error: logger.error( @@ -790,7 +842,9 @@ async def call_ai_engine_conversion(job_id: str): print(f"Error: Job {job_id} not found for AI Engine conversion.") return - def mirror_dict_from_job(job, progress_val=None, result_url=None, error_message=None): + def mirror_dict_from_job( + job, progress_val=None, result_url=None, error_message=None + ): # Compose dict for legacy mirror return ConversionJob( job_id=str(job.id), @@ -817,7 +871,9 @@ def mirror_dict_from_job(job, progress_val=None, result_url=None, error_message= output_path = os.path.join(CONVERSION_OUTPUTS_DIR, output_filename) # Get the input file path - input_file_path = os.path.join(TEMP_UPLOADS_DIR, f"{job.input_data.get('file_id')}.jar") + input_file_path = os.path.join( + TEMP_UPLOADS_DIR, f"{job.input_data.get('file_id')}.jar" + ) # Call AI Engine conversion_options = job.input_data.get("options", {}) @@ -829,11 +885,15 @@ def mirror_dict_from_job(job, progress_val=None, result_url=None, error_message= "conversion_options": conversion_options, } - print(f"Calling AI Engine at {AI_ENGINE_URL}/api/v1/convert with request: {ai_request}") + print( + f"Calling AI Engine at {AI_ENGINE_URL}/api/v1/convert with request: {ai_request}" + ) async with httpx.AsyncClient(timeout=600.0) as client: # 10 minute timeout # Start AI Engine conversion - response = await client.post(f"{AI_ENGINE_URL}/api/v1/convert", json=ai_request) + response = await client.post( + f"{AI_ENGINE_URL}/api/v1/convert", json=ai_request + ) if response.status_code != 200: raise Exception( @@ -849,14 +909,20 @@ def mirror_dict_from_job(job, progress_val=None, result_url=None, error_message= # Check if job was cancelled current_job = await crud.get_job(session, job_id) if current_job.status == "cancelled": - print(f"Job {job_id} was cancelled. Stopping AI Engine polling.") + print( + f"Job {job_id} was cancelled. Stopping AI Engine polling." + ) return # Get status from AI Engine - status_response = await client.get(f"{AI_ENGINE_URL}/api/v1/status/{job_id}") + status_response = await client.get( + f"{AI_ENGINE_URL}/api/v1/status/{job_id}" + ) if status_response.status_code != 200: - print(f"Failed to get AI Engine status: {status_response.status_code}") + print( + f"Failed to get AI Engine status: {status_response.status_code}" + ) continue ai_status = status_response.json() @@ -896,7 +962,9 @@ def mirror_dict_from_job(job, progress_val=None, result_url=None, error_message= ) # Verify the file exists if not os.path.exists(output_path): - print(f"Warning: Expected output file not found at {output_path}") + print( + f"Warning: Expected output file not found at {output_path}" + ) else: print(f"Job {job_id}: AI Engine conversion FAILED") @@ -1029,7 +1097,9 @@ async def get_conversion_status( descriptive_message = "Conversion completed successfully." elif status == "failed": descriptive_message = ( - f"Conversion failed: {error_message}" if error_message else "Conversion failed." + f"Conversion failed: {error_message}" + if error_message + else "Conversion failed." ) elif status == "cancelled": descriptive_message = "Job was cancelled by the user." @@ -1051,7 +1121,9 @@ async def get_conversion_status( # Fallback: load from DB job = await crud.get_job(db, job_id) if not job: - raise HTTPException(status_code=404, detail=f"Conversion job with ID '{job_id}' not found.") + raise HTTPException( + status_code=404, detail=f"Conversion job with ID '{job_id}' not found." + ) progress = job.progress.progress if job.progress else 0 error_message = None result_url = None @@ -1103,7 +1175,9 @@ async def get_conversion_status( ) -@app.get("/api/v1/conversions", response_model=List[ConversionStatus], tags=["conversion"]) +@app.get( + "/api/v1/conversions", response_model=List[ConversionStatus], tags=["conversion"] +) async def list_conversions(db: AsyncSession = Depends(get_db)): """ List all current and past conversion jobs. @@ -1164,7 +1238,9 @@ async def cancel_conversion( """ job = await crud.get_job(db, job_id) if not job: - raise HTTPException(status_code=404, detail=f"Conversion job with ID '{job_id}' not found.") + raise HTTPException( + status_code=404, detail=f"Conversion job with ID '{job_id}' not found." + ) if job.status == "cancelled": return {"message": f"Conversion job {job_id} is already cancelled."} job = await crud.update_job_status(db, job_id, "cancelled") @@ -1206,7 +1282,9 @@ async def download_converted_mod( job = conversion_jobs_db.get(job_id) if not job: - raise HTTPException(status_code=404, detail=f"Conversion job with ID '{job_id}' not found.") + raise HTTPException( + status_code=404, detail=f"Conversion job with ID '{job_id}' not found." + ) if job.status != "completed": raise HTTPException( @@ -1232,7 +1310,9 @@ async def download_converted_mod( if not os.path.exists(file_path): print(f"Error: Converted file not found at path: {file_path} for job {job_id}") # This case might indicate an issue post-completion or if the file was manually removed. - raise HTTPException(status_code=404, detail="Converted file not found on server.") + raise HTTPException( + status_code=404, detail="Converted file not found on server." + ) # Determine a user-friendly download filename original_filename_base = os.path.splitext(job.original_filename)[0] @@ -1358,8 +1438,12 @@ async def upsert_addon_details( and will create the addon if it doesn't exist, or update it if it does. For child collections (blocks, assets, recipes), this performs a full replacement. """ - db_addon = await crud.update_addon_details(session=db, addon_id=addon_id, addon_data=addon_data) - if db_addon is None: # Should not happen if crud.update_addon_details works as expected + db_addon = await crud.update_addon_details( + session=db, addon_id=addon_id, addon_data=addon_data + ) + if ( + db_addon is None + ): # Should not happen if crud.update_addon_details works as expected raise HTTPException(status_code=500, detail="Error processing addon data") return db_addon @@ -1386,17 +1470,23 @@ async def create_addon_asset_endpoint( session=db, addon_id=addon_id ) # Using get_addon_details to ensure addon exists if not addon: - raise HTTPException(status_code=404, detail=f"Addon with id {addon_id} not found.") + raise HTTPException( + status_code=404, detail=f"Addon with id {addon_id} not found." + ) try: db_asset = await crud.create_addon_asset( session=db, addon_id=addon_id, file=file, asset_type=asset_type ) - except ValueError as e: # Catch errors like Addon not found from CRUD (though checked above) + except ( + ValueError + ) as e: # Catch errors like Addon not found from CRUD (though checked above) raise HTTPException(status_code=404, detail=str(e)) except Exception as e: logger.error(f"Failed to create addon asset: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=f"Failed to create addon asset: {str(e)}") + raise HTTPException( + status_code=500, detail=f"Failed to create addon asset: {str(e)}" + ) return db_asset @@ -1422,7 +1512,9 @@ async def get_addon_asset_file( file_full_path = os.path.join(crud.BASE_ASSET_PATH, db_asset.path) if not os.path.exists(file_full_path): - logger.error(f"Asset file not found on disk: {file_full_path} for asset_id {asset_id}") + logger.error( + f"Asset file not found on disk: {file_full_path} for asset_id {asset_id}" + ) raise HTTPException(status_code=404, detail="Asset file not found on server.") return FileResponse( @@ -1454,17 +1546,25 @@ async def update_addon_asset_endpoint( ) try: - updated_asset = await crud.update_addon_asset(session=db, asset_id=asset_id, file=file) + updated_asset = await crud.update_addon_asset( + session=db, asset_id=asset_id, file=file + ) except Exception as e: logger.error(f"Failed to update addon asset {asset_id}: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=f"Failed to update addon asset: {str(e)}") + raise HTTPException( + status_code=500, detail=f"Failed to update addon asset: {str(e)}" + ) if not updated_asset: # Should be caught by prior check or raise exception in CRUD - raise HTTPException(status_code=404, detail="Asset not found after update attempt.") + raise HTTPException( + status_code=404, detail="Asset not found after update attempt." + ) return updated_asset -@app.delete("/api/v1/addons/{addon_id}/assets/{asset_id}", status_code=204, tags=["addons"]) +@app.delete( + "/api/v1/addons/{addon_id}/assets/{asset_id}", status_code=204, tags=["addons"] +) async def delete_addon_asset_endpoint( addon_id: PyUUID, # Validate addon ownership of asset asset_id: PyUUID, @@ -1482,7 +1582,9 @@ async def delete_addon_asset_endpoint( deleted_asset_info = await crud.delete_addon_asset(session=db, asset_id=asset_id) if not deleted_asset_info: # Should be caught by prior check - raise HTTPException(status_code=404, detail="Asset not found during delete operation.") + raise HTTPException( + status_code=404, detail="Asset not found during delete operation." + ) # Return 204 No Content by default for DELETE operations # FastAPI will automatically handle returning no body if status_code is 204 @@ -1509,7 +1611,9 @@ async def export_addon_mcaddon(addon_id: PyUUID, db: AsyncSession = Depends(get_ addon_pydantic=addon_details, asset_base_path=crud.BASE_ASSET_PATH ) except Exception as e: - logger.error(f"Error creating .mcaddon package for addon {addon_id}: {e}", exc_info=True) + logger.error( + f"Error creating .mcaddon package for addon {addon_id}: {e}", exc_info=True + ) raise HTTPException(status_code=500, detail=f"Failed to export addon: {str(e)}") # Sanitize addon name for filename diff --git a/backend/src/services/ai_engine_client.py b/backend/src/services/ai_engine_client.py index e517e5aa..6ca7f26d 100644 --- a/backend/src/services/ai_engine_client.py +++ b/backend/src/services/ai_engine_client.py @@ -17,7 +17,9 @@ # AI Engine configuration AI_ENGINE_URL = os.getenv("AI_ENGINE_URL", "http://ai-engine:8001") -AI_ENGINE_TIMEOUT = httpx.Timeout(1800.0) # 30 minutes timeout for long-running conversions +AI_ENGINE_TIMEOUT = httpx.Timeout( + 1800.0 +) # 30 minutes timeout for long-running conversions # Default poll interval for checking conversion status DEFAULT_POLL_INTERVAL = 2.0 # seconds @@ -279,7 +281,11 @@ async def poll_conversion_status( except AIEngineError as e: if e.status_code == 404: # Job not found - treat as terminal - yield {"status": "failed", "message": "Job not found", "progress": 0} + yield { + "status": "failed", + "message": "Job not found", + "progress": 0, + } break raise diff --git a/backend/src/services/structured_logging.py b/backend/src/services/structured_logging.py index 3ea81f2b..4cb724e2 100644 --- a/backend/src/services/structured_logging.py +++ b/backend/src/services/structured_logging.py @@ -19,7 +19,9 @@ from structlog.stdlib import ProcessorFormatter # Context variable to store correlation ID across async operations -correlation_id_var: ContextVar[Optional[str]] = ContextVar("correlation_id", default=None) +correlation_id_var: ContextVar[Optional[str]] = ContextVar( + "correlation_id", default=None +) # Context variable to store request metadata request_metadata_var: ContextVar[Optional[Dict[str, Any]]] = ContextVar( @@ -99,7 +101,8 @@ def configure_structlog( else: console_handler.setFormatter( logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", ) ) root_logger.addHandler(console_handler) @@ -207,7 +210,8 @@ def get_standard_logger(name: str) -> logging.Logger: console_handler.setLevel(logging.INFO) console_formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", ) console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) @@ -307,7 +311,9 @@ def __enter__(self): # Bind to structlog context structlog.contextvars.clear_contextvars() - structlog.contextvars.bind_contextvars(correlation_id=self.correlation_id, **self.metadata) + structlog.contextvars.bind_contextvars( + correlation_id=self.correlation_id, **self.metadata + ) return self diff --git a/backend/src/services/tracing.py b/backend/src/services/tracing.py new file mode 100644 index 00000000..dca50ddc --- /dev/null +++ b/backend/src/services/tracing.py @@ -0,0 +1,320 @@ +""" +Distributed Tracing Service using OpenTelemetry. + +This module provides tracing capabilities for the ModPorter AI application, +including trace context propagation between services. +""" + +import os +from typing import Optional +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.jaeger.thrift import JaegerExporter +from opentelemetry.sdk.extension.aws.resource.ec2 import AwsEc2ResourceDetector +from opentelemetry.sdk.extension.aws.resource.ecs import AwsEcsResourceDetector +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.trace import Status, StatusCode +from opentelemetry.context import Context +import logging + +logger = logging.getLogger(__name__) + +# Trace context propagator (W3C Trace Context) +tracer_propagator = TraceContextTextMapPropagator() + +# Global tracer instance +_tracer: Optional[trace.Tracer] = None +_tracer_provider: Optional[TracerProvider] = None + + +def get_tracer(service_name: str = "modporter-backend") -> trace.Tracer: + """ + Get or create a tracer instance for the given service. + + Args: + service_name: Name of the service for tracing + + Returns: + Configured tracer instance + """ + global _tracer, _tracer_provider + + if _tracer is not None: + return _tracer + + # Create resource with service information + service_version = os.getenv("SERVICE_VERSION", "1.0.0") + resource = Resource.create( + { + SERVICE_NAME: service_name, + SERVICE_VERSION: service_version, + } + ) + + # Add cloud metadata if available + try: + ec2_resource = AwsEc2ResourceDetector().detect() + resource = resource.merge(ec2_resource) + except Exception: + pass + + try: + ecs_resource = AwsEcsResourceDetector().detect() + resource = resource.merge(ecs_resource) + except Exception: + pass + + # Create tracer provider + _tracer_provider = TracerProvider(resource=resource) + + # Configure exporter based on environment + tracing_enabled = os.getenv("TRACING_ENABLED", "true").lower() == "true" + tracing_exporter = os.getenv("TRACING_EXPORTER", "jaeger").lower() + + if tracing_enabled: + if tracing_exporter == "jaeger": + # Jaeger exporter configuration + jaeger_host = os.getenv("JAEGER_HOST", "localhost") + jaeger_port = int(os.getenv("JAEGER_PORT", "6831")) + + jaeger_exporter = JaegerExporter( + agent_host_name=jaeger_host, + agent_port=jaeger_port, + ) + _tracer_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter)) + logger.info(f"Jaeger tracing enabled: {jaeger_host}:{jaeger_port}") + + elif tracing_exporter == "otlp": + # OTLP exporter configuration + otlp_endpoint = os.getenv("OTLP_ENDPOINT", "http://localhost:4317") + + otlp_exporter = OTLPSpanExporter( + endpoint=otlp_endpoint, + insecure=True, + ) + _tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter)) + logger.info(f"OTLP tracing enabled: {otlp_endpoint}") + + # Add console exporter for development + if os.getenv("TRACING_CONSOLE", "false").lower() == "true": + _tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) + logger.info("Console span exporter enabled") + + # Set the global tracer provider + trace.set_tracer_provider(_tracer_provider) + + # Create and return tracer + _tracer = trace.get_tracer(service_name) + + logger.info(f"Tracing initialized for service: {service_name}") + + return _tracer + + +def init_tracing( + app=None, + service_name: str = "modporter-backend", + instrument_fastapi: bool = True, + instrument_httpx: bool = True, + instrument_redis: bool = True, +) -> trace.Tracer: + """ + Initialize tracing with automatic instrumentation. + + Args: + app: FastAPI application instance (optional) + service_name: Name of the service + instrument_fastapi: Whether to instrument FastAPI + instrument_httpx: Whether to instrument HTTPX + instrument_redis: Whether to instrument Redis + + Returns: + Configured tracer instance + """ + tracer = get_tracer(service_name) + + # Instrument FastAPI if app provided + if app and instrument_fastapi: + try: + FastAPIInstrumentor.instrument_app(app) + logger.info("FastAPI instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument FastAPI: {e}") + + # Instrument HTTPX + if instrument_httpx: + try: + HTTPXClientInstrumentor().instrument() + logger.info("HTTPX instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument HTTPX: {e}") + + # Instrument Redis + if instrument_redis: + try: + RedisInstrumentor().instrument() + logger.info("Redis instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument Redis: {e}") + + return tracer + + +def extract_trace_context(carrier: dict) -> Context: + """ + Extract trace context from carrier (e.g., HTTP headers). + + Args: + carrier: Dictionary containing trace context (e.g., HTTP headers) + + Returns: + Extracted context + """ + return tracer_propagator.extract(carrier) + + +def inject_trace_context(carrier: dict) -> dict: + """ + Inject trace context into carrier (e.g., HTTP headers). + + Args: + carrier: Dictionary to inject trace context into + + Returns: + Carrier with injected trace context + """ + tracer_propagator.inject(carrier) + return carrier + + +def create_span( + name: str, + context: Optional[Context] = None, + kind: trace.SpanKind = trace.SpanKind.INTERNAL, +) -> trace.Span: + """ + Create a new span with the given name and context. + + Args: + name: Name of the span + context: Parent context (optional) + kind: Span kind + + Returns: + New span + """ + tracer = get_tracer() + + if context: + with tracer.start_as_current_span(name, context=context, kind=kind) as span: + return span + else: + with tracer.start_as_current_span(name, kind=kind) as span: + return span + + +def add_span_attributes(span: trace.Span, attributes: dict) -> None: + """ + Add attributes to a span. + + Args: + span: Span to add attributes to + attributes: Dictionary of attributes + """ + for key, value in attributes.items(): + if value is not None: + span.set_attribute(key, str(value)) + + +def record_span_exception(span: trace.Span, exception: Exception) -> None: + """ + Record an exception on a span. + + Args: + span: Span to record exception on + exception: Exception to record + """ + span.set_status(Status(StatusCode.ERROR, str(exception))) + span.record_exception(exception) + + +def shutdown_tracing() -> None: + """Shutdown the tracing provider and flush any pending spans.""" + global _tracer_provider + + if _tracer_provider: + _tracer_provider.shutdown() + logger.info("Tracing provider shutdown") + + +class TracingMiddleware: + """ + Middleware for FastAPI to handle trace context propagation. + + This middleware extracts trace context from incoming requests + and injects it into outgoing requests. + """ + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + # Extract trace context from headers + headers = dict(scope.get("headers", [])) + # Convert bytes to string for headers + headers = {k.decode(): v.decode() for k, v in headers.items()} + + # The FastAPI instrumentation will handle this automatically, + # but we keep this for custom use cases + context = extract_trace_context(headers) + + # Continue with the request + await self.app(scope, receive, send) + + +def get_current_span() -> Optional[trace.Span]: + """ + Get the current active span if any. + + Returns: + Current span or None + """ + return trace.get_current_span() + + +def get_trace_id() -> Optional[str]: + """ + Get the current trace ID as a hex string. + + Returns: + Trace ID or None + """ + span = get_current_span() + if span: + trace_id = span.get_span_context().trace_id + return format(trace_id, "032x") + return None + + +def get_span_id() -> Optional[str]: + """ + Get the current span ID as a hex string. + + Returns: + Span ID or None + """ + span = get_current_span() + if span: + span_id = span.get_span_context().span_id + return format(span_id, "016x") + return None