diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7b0e6885..5f4dc526 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,9 +42,12 @@ jobs: ruff: ${{ steps.changes.outputs.ruff }} steps: - uses: actions/checkout@v6 + with: + fetch-depth: 0 - uses: dorny/paths-filter@v3 id: changes with: + base: main filters: | backend: - 'backend/**' diff --git a/ai-engine/main.py b/ai-engine/main.py index f20a88bb..0411b0ba 100644 --- a/ai-engine/main.py +++ b/ai-engine/main.py @@ -15,6 +15,9 @@ from dotenv import load_dotenv import redis.asyncio as aioredis +# Import tracing module +from tracing import init_tracing, shutdown_tracing + # Configure logging using centralized configuration from utils.logging_config import setup_logging, get_agent_logger, configure_structlog @@ -27,12 +30,12 @@ # Also configure structlog for structured JSON logging in production configure_structlog( debug_mode=debug_mode, - json_format=os.getenv("LOG_JSON_FORMAT", "false").lower() == "true" + json_format=os.getenv("LOG_JSON_FORMAT", "false").lower() == "true", ) setup_logging( debug_mode=debug_mode, - enable_file_logging=os.getenv("ENABLE_FILE_LOGGING", "true").lower() == "true" + enable_file_logging=os.getenv("ENABLE_FILE_LOGGING", "true").lower() == "true", ) logger = get_agent_logger("main") @@ -43,7 +46,11 @@ # Import progress callback for real-time updates try: - from utils.progress_callback import create_progress_callback, cleanup_progress_callback + from utils.progress_callback import ( + create_progress_callback, + cleanup_progress_callback, + ) + PROGRESS_CALLBACK_AVAILABLE = True except ImportError: PROGRESS_CALLBACK_AVAILABLE = False @@ -59,14 +66,16 @@ if debug_mode: print_gpu_info() + # Status enumeration for conversion states class ConversionStatusEnum(str, Enum): QUEUED = "queued" - IN_PROGRESS = "in_progress" + IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" + # FastAPI app configuration app = FastAPI( title="ModPorter AI Engine", @@ -84,7 +93,9 @@ class ConversionStatusEnum(str, Enum): ) # CORS middleware - Restrict origins for security -allowed_origins = os.getenv("ALLOWED_ORIGINS", "http://localhost:3000,http://localhost:8080").split(",") +allowed_origins = os.getenv("ALLOWED_ORIGINS", "http://localhost:3000,http://localhost:8080").split( + "," +) app.add_middleware( CORSMiddleware, allow_origins=allowed_origins, @@ -98,55 +109,60 @@ class ConversionStatusEnum(str, Enum): assumption_engine = None redis_client = None + # Redis job state management class RedisJobManager: def __init__(self, redis_client): self.redis = redis_client self.available = True - + async def set_job_status(self, job_id: str, status: "ConversionStatus") -> None: """Store job status in Redis with error handling""" try: if not self.available: raise HTTPException(status_code=503, detail="Job state storage unavailable") - + status_dict = status.model_dump() - status_dict['started_at'] = status_dict['started_at'].isoformat() if status_dict['started_at'] else None - status_dict['completed_at'] = status_dict['completed_at'].isoformat() if status_dict['completed_at'] else None - + status_dict["started_at"] = ( + status_dict["started_at"].isoformat() if status_dict["started_at"] else None + ) + status_dict["completed_at"] = ( + status_dict["completed_at"].isoformat() if status_dict["completed_at"] else None + ) + await self.redis.set( - f"ai_engine:jobs:{job_id}", + f"ai_engine:jobs:{job_id}", json.dumps(status_dict), - ex=3600 # Expire after 1 hour + ex=3600, # Expire after 1 hour ) except Exception as e: logger.error(f"Failed to store job status in Redis: {e}", exc_info=True) self.available = False raise HTTPException(status_code=503, detail="Job state storage failed") - + async def get_job_status(self, job_id: str) -> Optional["ConversionStatus"]: """Retrieve job status from Redis with error handling""" try: if not self.available: return None - + data = await self.redis.get(f"ai_engine:jobs:{job_id}") if not data: return None - + status_dict = json.loads(data) # Convert ISO strings back to datetime - if status_dict.get('started_at'): - status_dict['started_at'] = datetime.fromisoformat(status_dict['started_at']) - if status_dict.get('completed_at'): - status_dict['completed_at'] = datetime.fromisoformat(status_dict['completed_at']) - + if status_dict.get("started_at"): + status_dict["started_at"] = datetime.fromisoformat(status_dict["started_at"]) + if status_dict.get("completed_at"): + status_dict["completed_at"] = datetime.fromisoformat(status_dict["completed_at"]) + return ConversionStatus(**status_dict) except Exception as e: logger.error(f"Failed to retrieve job status from Redis: {e}", exc_info=True) self.available = False return None - + async def delete_job(self, job_id: str) -> None: """Remove job from Redis""" try: @@ -155,32 +171,45 @@ async def delete_job(self, job_id: str) -> None: except Exception as e: logger.error(f"Failed to delete job from Redis: {e}", exc_info=True) + job_manager = None + # Pydantic models class HealthResponse(BaseModel): """Health check response model""" + status: str version: str timestamp: str services: Dict[str, str] + class ConversionRequest(BaseModel): """Conversion request model""" + job_id: str = Field(..., description="Unique job identifier") mod_file_path: str = Field(..., description="Path to the mod file") - conversion_options: Optional[Dict[str, Any]] = Field(default={}, description="Conversion options") - experiment_variant: Optional[str] = Field(default=None, description="Experiment variant ID for A/B testing") + conversion_options: Optional[Dict[str, Any]] = Field( + default={}, description="Conversion options" + ) + experiment_variant: Optional[str] = Field( + default=None, description="Experiment variant ID for A/B testing" + ) + class ConversionResponse(BaseModel): """Conversion response model""" + job_id: str status: str message: str estimated_time: Optional[int] = None + class ConversionStatus(BaseModel): """Conversion status model""" + job_id: str status: str progress: int @@ -189,70 +218,92 @@ class ConversionStatus(BaseModel): started_at: Optional[datetime] = None completed_at: Optional[datetime] = None + # Job storage is now handled by RedisJobManager - no global dict + @app.on_event("startup") async def startup_event(): """Initialize services on startup""" global conversion_crew, assumption_engine, redis_client, job_manager - + logger.info("Starting ModPorter AI Engine...") - + + # Initialize tracing + init_tracing(app=app, service_name="modporter-ai-engine") + logger.info("Distributed tracing initialized") + try: # Initialize Redis connection redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") redis_client = aioredis.from_url(redis_url, decode_responses=True) - + # Test Redis connection await redis_client.ping() logger.info("Redis connection established") - + # Initialize job manager job_manager = RedisJobManager(redis_client) logger.info("RedisJobManager initialized") - + # Initialize SmartAssumptionEngine assumption_engine = SmartAssumptionEngine() logger.info("SmartAssumptionEngine initialized") - + # Note: We now initialize the conversion crew per request to support variants # The global conversion_crew will remain None - + logger.info("ModPorter AI Engine startup complete") - + except Exception as e: logger.error(f"Failed to initialize AI Engine: {e}", exc_info=True) raise HTTPException(status_code=503, detail="Service initialization failed") + +@app.on_event("shutdown") +async def shutdown_event(): + """Cleanup on shutdown""" + global redis_client + + # Shutdown tracing + shutdown_tracing() + logger.info("Distributed tracing shutdown") + + # Close Redis connection + if redis_client: + await redis_client.close() + logger.info("Redis connection closed") + + logger.info("ModPorter AI Engine shutdown complete") + + @app.get("/api/v1/health", response_model=HealthResponse, tags=["health"]) async def health_check(): """Check the health status of the AI Engine""" services = { "assumption_engine": "healthy" if assumption_engine else "unavailable", } - + # Conversion crew is now initialized per request, so we don't check it here - + return HealthResponse( status="healthy", version="1.0.0", timestamp=datetime.utcnow().isoformat(), - services=services + services=services, ) + @app.post("/api/v1/convert", response_model=ConversionResponse, tags=["conversion"]) -async def start_conversion( - request: ConversionRequest, - background_tasks: BackgroundTasks -): +async def start_conversion(request: ConversionRequest, background_tasks: BackgroundTasks): """Start a new mod conversion job""" - + if not job_manager or not job_manager.available: raise HTTPException(status_code=503, detail="Job state storage unavailable") - + # Initialize conversion crew with variant if specified # The crew is now initialized in the background task to avoid blocking the request - + # Create job status job_status = ConversionStatus( job_id=request.job_id, @@ -260,75 +311,83 @@ async def start_conversion( progress=0, current_stage="initialization", message="Conversion job queued", - started_at=datetime.utcnow() + started_at=datetime.utcnow(), ) - + # Store in Redis instead of global dict await job_manager.set_job_status(request.job_id, job_status) - + # Start conversion in background background_tasks.add_task( process_conversion, request.job_id, request.mod_file_path, request.conversion_options, - request.experiment_variant # Pass variant to process_conversion + request.experiment_variant, # Pass variant to process_conversion ) - + logger.info(f"Started conversion job {request.job_id}") - + return ConversionResponse( job_id=request.job_id, status="queued", message="Conversion job started", - estimated_time=120 # Placeholder - would be calculated based on mod size + estimated_time=120, # Placeholder - would be calculated based on mod size ) + @app.get("/api/v1/status/{job_id}", response_model=ConversionStatus, tags=["conversion"]) async def get_conversion_status(job_id: str): """Get the status of a conversion job""" - + if not job_manager: raise HTTPException(status_code=503, detail="Job state storage unavailable") - + job_status = await job_manager.get_job_status(job_id) if not job_status: raise HTTPException(status_code=404, detail="Job not found") - + return job_status + @app.get("/api/v1/jobs", response_model=List[ConversionStatus], tags=["conversion"]) async def list_jobs(): """List all active conversion jobs""" if not job_manager or not job_manager.available: raise HTTPException(status_code=503, detail="Job state storage unavailable") - + # Note: In production, implement pagination and filtering # For now, return empty list as Redis doesn't have easy "list all" without keys logger.warning("list_jobs endpoint returns empty - implement Redis SCAN for production") return [] -async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, Any], experiment_variant: Optional[str] = None): + +async def process_conversion( + job_id: str, + mod_file_path: str, + options: Dict[str, Any], + experiment_variant: Optional[str] = None, +): """Process a conversion job using the AI crew""" - + progress_callback = None - + try: # Get current job status job_status = await job_manager.get_job_status(job_id) if not job_status: logger.error(f"Job {job_id} not found during processing") return - + # Update job status job_status.status = "processing" job_status.current_stage = "analysis" job_status.message = "Analyzing mod structure" job_status.progress = 10 await job_manager.set_job_status(job_id, job_status) - + logger.info(f"Processing conversion for job {job_id} with variant {experiment_variant}") - + # Create progress callback for real-time updates if available if PROGRESS_CALLBACK_AVAILABLE and create_progress_callback: try: @@ -336,25 +395,27 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, logger.info(f"Progress callback created for job {job_id}") except Exception as e: logger.warning(f"Failed to create progress callback: {e}") - + # Prepare output path output_path = options.get("output_path") if not output_path: # Default output path using job_id pattern that backend expects # Use the mounted volume path inside the container - output_path = os.path.join(os.getenv("CONVERSION_OUTPUT_DIR", "/app/conversion_outputs"), f"{job_id}_converted.mcaddon") - + output_path = os.path.join( + os.getenv("CONVERSION_OUTPUT_DIR", "/app/conversion_outputs"), + f"{job_id}_converted.mcaddon", + ) + # Ensure the output directory exists os.makedirs(os.path.dirname(output_path), exist_ok=True) - + try: # Initialize conversion crew with variant if specified and pass progress callback crew = ModPorterConversionCrew( - variant_id=experiment_variant, - progress_callback=progress_callback + variant_id=experiment_variant, progress_callback=progress_callback ) logger.info(f"ModPorterConversionCrew initialized with variant: {experiment_variant}") - + # Update status for analysis stage job_status = await job_manager.get_job_status(job_id) if job_status: @@ -362,27 +423,32 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, job_status.message = "Analyzing Java mod structure" job_status.progress = 20 await job_manager.set_job_status(job_id, job_status) - + # Execute the actual AI conversion using the conversion crew from pathlib import Path + conversion_result = crew.convert_mod( mod_path=Path(mod_file_path), output_path=Path(output_path), smart_assumptions=options.get("smart_assumptions", True), - include_dependencies=options.get("include_dependencies", True) + include_dependencies=options.get("include_dependencies", True), ) - + # Update progress based on conversion result if conversion_result.get("status") == "failed": # Mark job as failed job_status = await job_manager.get_job_status(job_id) if job_status: job_status.status = "failed" - job_status.message = f"Conversion failed: {conversion_result.get('error', 'Unknown error')}" + job_status.message = ( + f"Conversion failed: {conversion_result.get('error', 'Unknown error')}" + ) await job_manager.set_job_status(job_id, job_status) - logger.error(f"Conversion failed for job {job_id}: {conversion_result.get('error')}") + logger.error( + f"Conversion failed for job {job_id}: {conversion_result.get('error')}" + ) return - + # Update progress through conversion stages stages = [ ("planning", "Creating conversion plan", 40), @@ -391,7 +457,7 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, ("packaging", "Packaging Bedrock addon", 90), ("validation", "Validating conversion", 95), ] - + for stage, message, progress in stages: job_status = await job_manager.get_job_status(job_id) if job_status: @@ -399,16 +465,19 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, job_status.message = message job_status.progress = progress await job_manager.set_job_status(job_id, job_status) - + # Short delay to show progress import asyncio + await asyncio.sleep(0.5) - + # Verify output file was created if not os.path.exists(output_path): logger.error(f"Output file not created by conversion crew: {output_path}") - logger.error("This indicates a serious conversion failure that should not be masked") - + logger.error( + "This indicates a serious conversion failure that should not be masked" + ) + # Mark job as failed explicitly instead of creating a fake successful output job_status = await job_manager.get_job_status(job_id) if job_status: @@ -416,9 +485,9 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, job_status.message = "Conversion crew failed to produce output file - this indicates a serious error in the conversion process" await job_manager.set_job_status(job_id, job_status) return - + logger.info(f"Conversion completed successfully: {output_path}") - + except Exception as conversion_error: logger.error(f"Failed to convert mod {mod_file_path}: {conversion_error}") # Mark job as failed if conversion fails @@ -436,12 +505,12 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, job_status.message = "Conversion completed successfully" job_status.completed_at = datetime.utcnow() await job_manager.set_job_status(job_id, job_status) - + logger.info(f"Completed conversion for job {job_id}") - + except Exception as e: logger.error(f"Conversion failed for job {job_id}: {e}", exc_info=True) - + # Update job status to failed job_status = await job_manager.get_job_status(job_id) if job_status: @@ -450,7 +519,10 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, try: await job_manager.set_job_status(job_id, job_status) except Exception as status_error: - logger.error(f"Failed to update job status after error: {status_error}", exc_info=True) + logger.error( + f"Failed to update job status after error: {status_error}", + exc_info=True, + ) finally: # Clean up progress callback if progress_callback and PROGRESS_CALLBACK_AVAILABLE and cleanup_progress_callback: @@ -460,10 +532,11 @@ async def process_conversion(job_id: str, mod_file_path: str, options: Dict[str, except Exception as e: logger.warning(f"Failed to cleanup progress callback: {e}") + if __name__ == "__main__": uvicorn.run( "main:app", host=os.getenv("HOST", "0.0.0.0"), port=int(os.getenv("PORT", 8001)), - reload=os.getenv("DEBUG", "false").lower() == "true" + reload=os.getenv("DEBUG", "false").lower() == "true", ) diff --git a/ai-engine/pyproject.toml b/ai-engine/pyproject.toml index b3d5a6a0..420bfc86 100644 --- a/ai-engine/pyproject.toml +++ b/ai-engine/pyproject.toml @@ -16,11 +16,15 @@ exclude = [ ] [tool.ruff.lint] -# Enable pycodestyle (E), Pyflakes (F), isort (I), flake8-quotes (Q), flake8-naming (N), pydocstyle (D), pyupgrade (UP) -select = ["E", "F", "I", "Q", "N", "D", "UP"] +# Enable pycodestyle (E), Pyflakes (F), warnings (W), isort (I), flake8-quotes (Q), flake8-naming (N), pydocstyle (D), pyupgrade (UP) +select = ["E", "F", "W", "I", "Q", "N", "D", "UP"] ignore = [ "E402", # module level import not at top of file "E501", # line too long (handled by formatter) + # W (pycodestyle warnings) ignores for legacy code + "W291", # trailing whitespace (legacy code has inconsistent whitespace) + "W292", # no newline at end of file (legacy code has inconsistent newlines) + "W293", # blank line contains whitespace (legacy code has inconsistent blank lines) "Q000", # bad quote marks "Q001", # bad quote marks in multiline strings "Q002", # bad quote marks in strings diff --git a/ai-engine/requirements-dev.txt b/ai-engine/requirements-dev.txt index 93146153..8e90a484 100644 --- a/ai-engine/requirements-dev.txt +++ b/ai-engine/requirements-dev.txt @@ -15,4 +15,8 @@ black>=23.0.0 mypy>=1.5.0 # Development Tools -pre-commit>=3.0.0 \ No newline at end of file +pre-commit>=3.0.0 + +# Dependency checking +pip-audit>=2.7.0 +pipdeptree>=2.23.0 \ No newline at end of file diff --git a/ai-engine/requirements.txt b/ai-engine/requirements.txt index c6f209d9..41cbf971 100644 --- a/ai-engine/requirements.txt +++ b/ai-engine/requirements.txt @@ -40,7 +40,8 @@ validators>=0.22.0 # For URL validation (BedrockDocsScraper) ddgs>=5.3.0 # For actual web search functionality (replaces duckduckgo-search) # Redis for job management -redis>=4.5.0 +# Pin redis to avoid resolution issues with complex dependency graph +redis>=4.5.0,<6.0.0 # Configuration Management pydantic>=2.0.0 @@ -49,4 +50,14 @@ pydantic-settings # Monitoring prometheus-client psutil -structlog>=24.0.0 \ No newline at end of file +structlog>=24.0.0 + +# Distributed Tracing - OpenTelemetry +# Note: opentelemetry-exporter-jaeger 1.21.0 is the latest version compatible with Python 3.11 +opentelemetry-api>=1.24.0 +opentelemetry-sdk>=1.24.0 +opentelemetry-exporter-otlp>=1.24.0 +opentelemetry-exporter-jaeger==1.21.0 +opentelemetry-instrumentation-fastapi>=0.45b0 +opentelemetry-instrumentation-httpx>=0.45b0 +opentelemetry-instrumentation-redis>=0.45b0 \ No newline at end of file diff --git a/ai-engine/tracing.py b/ai-engine/tracing.py new file mode 100644 index 00000000..d56a9009 --- /dev/null +++ b/ai-engine/tracing.py @@ -0,0 +1,320 @@ +""" +Distributed Tracing Service using OpenTelemetry for AI Engine. + +This module provides tracing capabilities for the AI Engine service, +including trace context propagation between services. +""" + +import os +from typing import Optional +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.jaeger.thrift import JaegerExporter +from opentelemetry.sdk.extension.aws.resource.ec2 import AwsEc2ResourceDetector +from opentelemetry.sdk.extension.aws.resource.ecs import AwsEcsResourceDetector +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.trace import Status, StatusCode +from opentelemetry.context import Context +import logging + +logger = logging.getLogger(__name__) + +# Trace context propagator (W3C Trace Context) +tracer_propagator = TraceContextTextMapPropagator() + +# Global tracer instance +_tracer: Optional[trace.Tracer] = None +_tracer_provider: Optional[TracerProvider] = None + + +def get_tracer(service_name: str = "modporter-ai-engine") -> trace.Tracer: + """ + Get or create a tracer instance for the given service. + + Args: + service_name: Name of the service for tracing + + Returns: + Configured tracer instance + """ + global _tracer, _tracer_provider + + if _tracer is not None: + return _tracer + + # Create resource with service information + service_version = os.getenv("SERVICE_VERSION", "1.0.0") + resource = Resource.create( + { + SERVICE_NAME: service_name, + SERVICE_VERSION: service_version, + } + ) + + # Add cloud metadata if available + try: + ec2_resource = AwsEc2ResourceDetector().detect() + resource = resource.merge(ec2_resource) + except Exception: + pass + + try: + ecs_resource = AwsEcsResourceDetector().detect() + resource = resource.merge(ecs_resource) + except Exception: + pass + + # Create tracer provider + _tracer_provider = TracerProvider(resource=resource) + + # Configure exporter based on environment + tracing_enabled = os.getenv("TRACING_ENABLED", "true").lower() == "true" + tracing_exporter = os.getenv("TRACING_EXPORTER", "jaeger").lower() + + if tracing_enabled: + if tracing_exporter == "jaeger": + # Jaeger exporter configuration + jaeger_host = os.getenv("JAEGER_HOST", "localhost") + jaeger_port = int(os.getenv("JAEGER_PORT", "6831")) + + jaeger_exporter = JaegerExporter( + agent_host_name=jaeger_host, + agent_port=jaeger_port, + ) + _tracer_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter)) + logger.info(f"Jaeger tracing enabled: {jaeger_host}:{jaeger_port}") + + elif tracing_exporter == "otlp": + # OTLP exporter configuration + otlp_endpoint = os.getenv("OTLP_ENDPOINT", "http://localhost:4317") + + otlp_exporter = OTLPSpanExporter( + endpoint=otlp_endpoint, + insecure=True, + ) + _tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter)) + logger.info(f"OTLP tracing enabled: {otlp_endpoint}") + + # Add console exporter for development + if os.getenv("TRACING_CONSOLE", "false").lower() == "true": + _tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) + logger.info("Console span exporter enabled") + + # Set the global tracer provider + trace.set_tracer_provider(_tracer_provider) + + # Create and return tracer + _tracer = trace.get_tracer(service_name) + + logger.info(f"Tracing initialized for service: {service_name}") + + return _tracer + + +def init_tracing( + app=None, + service_name: str = "modporter-ai-engine", + instrument_fastapi: bool = True, + instrument_httpx: bool = True, + instrument_redis: bool = True, +) -> trace.Tracer: + """ + Initialize tracing with automatic instrumentation. + + Args: + app: FastAPI application instance (optional) + service_name: Name of the service + instrument_fastapi: Whether to instrument FastAPI + instrument_httpx: Whether to instrument HTTPX + instrument_redis: Whether to instrument Redis + + Returns: + Configured tracer instance + """ + tracer = get_tracer(service_name) + + # Instrument FastAPI if app provided + if app and instrument_fastapi: + try: + FastAPIInstrumentor.instrument_app(app) + logger.info("FastAPI instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument FastAPI: {e}") + + # Instrument HTTPX + if instrument_httpx: + try: + HTTPXClientInstrumentor().instrument() + logger.info("HTTPX instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument HTTPX: {e}") + + # Instrument Redis + if instrument_redis: + try: + RedisInstrumentor().instrument() + logger.info("Redis instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument Redis: {e}") + + return tracer + + +def extract_trace_context(carrier: dict) -> Context: + """ + Extract trace context from carrier (e.g., HTTP headers). + + Args: + carrier: Dictionary containing trace context (e.g., HTTP headers) + + Returns: + Extracted context + """ + return tracer_propagator.extract(carrier) + + +def inject_trace_context(carrier: dict) -> dict: + """ + Inject trace context into carrier (e.g., HTTP headers). + + Args: + carrier: Dictionary to inject trace context into + + Returns: + Carrier with injected trace context + """ + tracer_propagator.inject(carrier) + return carrier + + +def create_span( + name: str, + context: Optional[Context] = None, + kind: trace.SpanKind = trace.SpanKind.INTERNAL, +) -> trace.Span: + """ + Create a new span with the given name and context. + + Args: + name: Name of the span + context: Parent context (optional) + kind: Span kind + + Returns: + New span + """ + tracer = get_tracer() + + if context: + with tracer.start_as_current_span(name, context=context, kind=kind) as span: + return span + else: + with tracer.start_as_current_span(name, kind=kind) as span: + return span + + +def add_span_attributes(span: trace.Span, attributes: dict) -> None: + """ + Add attributes to a span. + + Args: + span: Span to add attributes to + attributes: Dictionary of attributes + """ + for key, value in attributes.items(): + if value is not None: + span.set_attribute(key, str(value)) + + +def record_span_exception(span: trace.Span, exception: Exception) -> None: + """ + Record an exception on a span. + + Args: + span: Span to record exception on + exception: Exception to record + """ + span.set_status(Status(StatusCode.ERROR, str(exception))) + span.record_exception(exception) + + +def shutdown_tracing() -> None: + """Shutdown the tracing provider and flush any pending spans.""" + global _tracer_provider + + if _tracer_provider: + _tracer_provider.shutdown() + logger.info("Tracing provider shutdown") + + +class TracingMiddleware: + """ + Middleware for FastAPI to handle trace context propagation. + + This middleware extracts trace context from incoming requests + and injects it into outgoing requests. + """ + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + # Extract trace context from headers + headers = dict(scope.get("headers", [])) + # Convert bytes to string for headers + headers = {k.decode(): v.decode() for k, v in headers.items()} + + # The FastAPI instrumentation will handle this automatically, + # but we keep this for custom use cases + context = extract_trace_context(headers) + + # Continue with the request + await self.app(scope, receive, send) + + +def get_current_span() -> Optional[trace.Span]: + """ + Get the current active span if any. + + Returns: + Current span or None + """ + return trace.get_current_span() + + +def get_trace_id() -> Optional[str]: + """ + Get the current trace ID as a hex string. + + Returns: + Trace ID or None + """ + span = get_current_span() + if span: + trace_id = span.get_span_context().trace_id + return format(trace_id, "032x") + return None + + +def get_span_id() -> Optional[str]: + """ + Get the current span ID as a hex string. + + Returns: + Span ID or None + """ + span = get_current_span() + if span: + span_id = span.get_span_context().span_id + return format(span_id, "016x") + return None diff --git a/ai-engine/utils/logging_config.py b/ai-engine/utils/logging_config.py index 713c5c0e..f1304700 100644 --- a/ai-engine/utils/logging_config.py +++ b/ai-engine/utils/logging_config.py @@ -38,7 +38,7 @@ def configure_structlog( ): """ Configure structlog for the AI engine. - + Args: log_level: Logging level (DEBUG, INFO, WARNING, ERROR) log_file: Path to log file (optional) @@ -47,36 +47,35 @@ def configure_structlog( """ if log_level is None: log_level = os.getenv("LOG_LEVEL", "INFO").upper() - + # Auto-detect JSON format in production if json_format is None: json_format = os.getenv("LOG_JSON_FORMAT", "false").lower() == "true" if os.getenv("ENVIRONMENT", "development") == "production": json_format = True - + # Get log directory log_dir = os.getenv("LOG_DIR", "/tmp/modporter-ai/logs") - + # Configure processors based on format + # Order matters: context merging -> logger info -> level -> timestamper -> renderer -> exception handling processors = [ structlog.contextvars.merge_contextvars, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, ] - + if debug_mode: processors.append(structlog.dev.ConsoleRenderer()) elif json_format: processors.append(structlog.processors.JSONRenderer()) else: processors.append(structlog.dev.ConsoleRenderer(colors=False)) - - # Add exception info processor - processors.append(structlog.processors.StackInfoRenderer()) - processors.append(structlog.processors.format_exc_info) - + # Configure structlog structlog.configure( processors=processors, @@ -85,160 +84,169 @@ def configure_structlog( logger_factory=structlog.stdlib.LoggerFactory(), cache_logger_on_first_use=True, ) - + # Also configure standard library logging root_logger = logging.getLogger() root_logger.setLevel(getattr(logging, log_level, logging.INFO)) - + # Clear existing handlers root_logger.handlers.clear() - + # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(getattr(logging, log_level, logging.INFO)) - console_handler.setFormatter(logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S" - )) + console_handler.setFormatter( + logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + ) root_logger.addHandler(console_handler) - + # File handler for production if log_file is None: os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, "ai-engine.log") - + file_handler = logging.handlers.RotatingFileHandler( log_file, - maxBytes=10 * 1024 * 1024, # 10MB + maxBytes=10 * 1024 * 1024, backupCount=5, - encoding='utf-8' + encoding="utf-8", # 10MB ) file_handler.setLevel(logging.INFO) - file_handler.setFormatter(logging.Formatter( - "%(message)s" - )) + file_handler.setFormatter(logging.Formatter("%(message)s")) root_logger.addHandler(file_handler) - + return structlog.get_logger() class AgentLogFormatter(logging.Formatter): """Custom formatter for agent logging with structured output""" - + def __init__(self, include_agent_context: bool = True): self.include_agent_context = include_agent_context super().__init__() - + def format(self, record: logging.LogRecord) -> str: # Base format with timestamp, level, and logger name - timestamp = datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] - + timestamp = datetime.fromtimestamp(record.created).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + # Extract agent name from logger name (e.g., 'agents.java_analyzer' -> 'JavaAnalyzer') - logger_parts = record.name.split('.') - if 'agents' in logger_parts: - agent_idx = logger_parts.index('agents') + logger_parts = record.name.split(".") + if "agents" in logger_parts: + agent_idx = logger_parts.index("agents") if agent_idx + 1 < len(logger_parts): - agent_name = logger_parts[agent_idx + 1].replace('_', '').title() + agent_name = logger_parts[agent_idx + 1].replace("_", "").title() else: - agent_name = 'Agent' - elif 'crew' in logger_parts: - agent_name = 'Crew' + agent_name = "Agent" + elif "crew" in logger_parts: + agent_name = "Crew" else: - agent_name = record.name.split('.')[-1].title() - + agent_name = record.name.split(".")[-1].title() + # Build the log message base_msg = f"{timestamp} [{record.levelname}] {agent_name}: {record.getMessage()}" - + # Add extra context if available - if hasattr(record, 'agent_context') and self.include_agent_context: + if hasattr(record, "agent_context") and self.include_agent_context: context = record.agent_context if isinstance(context, dict): - context_str = json.dumps(context, separators=(',', ':')) + context_str = json.dumps(context, separators=(",", ":")) base_msg += f" | Context: {context_str}" - + # Add operation timing if available - if hasattr(record, 'operation_time'): + if hasattr(record, "operation_time"): base_msg += f" | Duration: {record.operation_time:.3f}s" - + # Add tool usage information if available - if hasattr(record, 'tool_name'): + if hasattr(record, "tool_name"): base_msg += f" | Tool: {record.tool_name}" - if hasattr(record, 'tool_result'): + if hasattr(record, "tool_result"): result_preview = str(record.tool_result)[:100] if len(str(record.tool_result)) > 100: result_preview += "..." base_msg += f" | Result: {result_preview}" - + return base_msg class AgentLogger: """Enhanced logger for agent operations with structured logging""" - + def __init__(self, name: str): self.logger = logging.getLogger(name) - self.agent_name = name.split('.')[-1].replace('_', '').title() - + self.agent_name = name.split(".")[-1].replace("_", "").title() + def info(self, message: str, **kwargs): """Log info message with optional context""" extra = self._build_extra(**kwargs) self.logger.info(message, extra=extra) - + def debug(self, message: str, **kwargs): """Log debug message with optional context""" extra = self._build_extra(**kwargs) self.logger.debug(message, extra=extra) - + def warning(self, message: str, **kwargs): """Log warning message with optional context""" extra = self._build_extra(**kwargs) self.logger.warning(message, extra=extra) - + def error(self, message: str, **kwargs): """Log error message with optional context""" extra = self._build_extra(**kwargs) self.logger.error(message, extra=extra) - + def log_operation_start(self, operation: str, **context): """Log the start of an operation""" self.info(f"Starting {operation}", agent_context=context) - + def log_operation_complete(self, operation: str, duration: float, **context): """Log the completion of an operation with timing""" self.info(f"Completed {operation}", operation_time=duration, agent_context=context) - + def log_tool_usage(self, tool_name: str, result: Any = None, duration: Optional[float] = None): """Log tool usage with results""" - kwargs = {'tool_name': tool_name} + kwargs = {"tool_name": tool_name} if result is not None: - kwargs['tool_result'] = result + kwargs["tool_result"] = result if duration is not None: - kwargs['operation_time'] = duration - + kwargs["operation_time"] = duration + self.info(f"Used tool: {tool_name}", **kwargs) - + def log_agent_decision(self, decision: str, reasoning: str, **context): """Log agent decision-making process""" log_context = context.copy() - log_context['decision'] = decision - log_context['reasoning'] = reasoning + log_context["decision"] = decision + log_context["reasoning"] = reasoning self.info(f"Decision: {decision}", agent_context=log_context) - - def log_data_transfer(self, from_agent: str, to_agent: str, data_type: str, data_size: Optional[int] = None): + + def log_data_transfer( + self, + from_agent: str, + to_agent: str, + data_type: str, + data_size: Optional[int] = None, + ): """Log data transfer between agents""" context = { - 'from_agent': from_agent, - 'to_agent': to_agent, - 'data_type': data_type + "from_agent": from_agent, + "to_agent": to_agent, + "data_type": data_type, } if data_size is not None: - context['data_size'] = data_size - - self.info(f"Data transfer: {data_type} from {from_agent} to {to_agent}", agent_context=context) - + context["data_size"] = data_size + + self.info( + f"Data transfer: {data_type} from {from_agent} to {to_agent}", + agent_context=context, + ) + def _build_extra(self, **kwargs) -> Dict[str, Any]: """Build extra dictionary for logging""" - valid_keys = {'agent_context', 'operation_time', 'tool_name', 'tool_result'} + valid_keys = {"agent_context", "operation_time", "tool_name", "tool_result"} extra = {key: value for key, value in kwargs.items() if key in valid_keys} return extra @@ -249,11 +257,11 @@ def setup_logging( enable_file_logging: bool = True, debug_mode: bool = False, max_file_size: int = 10 * 1024 * 1024, # 10MB - backup_count: int = 5 + backup_count: int = 5, ) -> None: """ Setup centralized logging configuration for the AI engine - + Args: log_level: Logging level (DEBUG, INFO, WARNING, ERROR) log_file: Path to log file (optional) @@ -262,30 +270,30 @@ def setup_logging( max_file_size: Maximum size of log file before rotation backup_count: Number of backup log files to keep """ - + # Determine log level if log_level is None: log_level = os.getenv("LOG_LEVEL", "DEBUG" if debug_mode else "INFO").upper() - + # Convert string level to logging constant numeric_level = getattr(logging, log_level, logging.INFO) - + # Create custom formatter formatter = AgentLogFormatter(include_agent_context=debug_mode) - + # Configure root logger root_logger = logging.getLogger() root_logger.setLevel(numeric_level) - + # Clear existing handlers root_logger.handlers.clear() - + # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(numeric_level) console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) - + # File handler (if enabled) if enable_file_logging: if log_file is None: @@ -293,36 +301,35 @@ def setup_logging( log_dir = Path(os.getenv("LOG_DIR", "/tmp/modporter-ai/logs")) log_dir.mkdir(parents=True, exist_ok=True) log_file = log_dir / "ai-engine.log" - + # Use rotating file handler to prevent huge log files file_handler = logging.handlers.RotatingFileHandler( - log_file, - maxBytes=max_file_size, - backupCount=backup_count, - encoding='utf-8' + log_file, maxBytes=max_file_size, backupCount=backup_count, encoding="utf-8" ) file_handler.setLevel(numeric_level) file_handler.setFormatter(formatter) root_logger.addHandler(file_handler) - + # Set specific logger levels for third-party libraries to reduce noise logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("crewai").setLevel(logging.INFO if debug_mode else logging.WARNING) - + # Log the configuration logger = logging.getLogger(__name__) - logger.info(f"Logging configured - Level: {log_level}, Debug: {debug_mode}, File: {enable_file_logging}") + logger.info( + f"Logging configured - Level: {log_level}, Debug: {debug_mode}, File: {enable_file_logging}" + ) def get_agent_logger(agent_name: str) -> AgentLogger: """ Get a configured logger for an agent - + Args: agent_name: Name of the agent (e.g., 'java_analyzer', 'logic_translator') - + Returns: AgentLogger instance """ @@ -333,10 +340,10 @@ def get_agent_logger(agent_name: str) -> AgentLogger: def get_structlog_logger(name: str = None) -> structlog.BoundLogger: """ Get a structlog logger instance. - + Args: name: Logger name (optional) - + Returns: Configured structlog logger """ @@ -348,16 +355,16 @@ def get_structlog_logger(name: str = None) -> structlog.BoundLogger: def set_correlation_id(correlation_id: Optional[str] = None) -> str: """ Set the correlation ID for the current context. - + Args: correlation_id: Optional correlation ID to use - + Returns: The correlation ID (either provided or generated) """ if correlation_id is None: correlation_id = str(uuid.uuid4()) - + correlation_id_var.set(correlation_id) structlog.contextvars.clear_contextvars() structlog.contextvars.bind_contextvars(correlation_id=correlation_id) @@ -367,7 +374,7 @@ def set_correlation_id(correlation_id: Optional[str] = None) -> str: def get_correlation_id() -> Optional[str]: """ Get the current correlation ID from the context. - + Returns: Current correlation ID or None """ @@ -390,20 +397,21 @@ def get_crew_logger() -> AgentLogger: # Performance timing decorator def log_performance(operation_name: str = None): """Decorator to log operation performance""" + def decorator(func): def wrapper(*args, **kwargs): - + # Get logger from the class instance if available - if args and hasattr(args[0], 'logger'): + if args and hasattr(args[0], "logger"): logger = args[0].logger else: - logger = get_agent_logger(func.__module__.split('.')[-1]) - + logger = get_agent_logger(func.__module__.split(".")[-1]) + op_name = operation_name or func.__name__ start_time = time.time() - + logger.log_operation_start(op_name) - + try: result = func(*args, **kwargs) duration = time.time() - start_time @@ -413,8 +421,9 @@ def wrapper(*args, **kwargs): duration = time.time() - start_time logger.error(f"Operation {op_name} failed after {duration:.3f}s: {str(e)}") raise - + return wrapper + return decorator @@ -423,12 +432,13 @@ def wrapper(*args, **kwargs): # ============================================================================ # Context variable for tracking operation context across async boundaries -_operation_context: ContextVar[Dict[str, Any]] = ContextVar('operation_context', default={}) +_operation_context: ContextVar[Dict[str, Any]] = ContextVar("operation_context", default={}) @dataclass class AgentDecision: """Represents an agent decision for logging and analysis""" + agent_name: str decision_type: str decision: str @@ -443,6 +453,7 @@ class AgentDecision: @dataclass class ToolUsage: """Represents tool usage for logging and analysis""" + agent_name: str tool_name: str input_params: Dict[str, Any] @@ -458,32 +469,32 @@ class AgentLogAnalyzer: Log analysis tools for agent operations. Provides insights into agent behavior, decisions, and performance. """ - + def __init__(self): self.decisions: List[AgentDecision] = [] self.tool_usages: List[ToolUsage] = [] self._lock = threading.Lock() - + def record_decision(self, decision: AgentDecision): """Record an agent decision for analysis""" with self._lock: self.decisions.append(decision) - + def record_tool_usage(self, usage: ToolUsage): """Record tool usage for analysis""" with self._lock: self.tool_usages.append(usage) - + def get_agent_statistics(self, agent_name: str = None) -> Dict[str, Any]: """Get statistics for a specific agent or all agents""" with self._lock: decisions = self.decisions tool_usages = self.tool_usages - + if agent_name: decisions = [d for d in decisions if d.agent_name == agent_name] tool_usages = [t for t in tool_usages if t.agent_name == agent_name] - + # Calculate decision statistics decision_types = defaultdict(int) avg_confidence = 0.0 @@ -492,79 +503,81 @@ def get_agent_statistics(self, agent_name: str = None) -> Dict[str, Any]: decision_types[d.decision_type] += 1 if d.confidence is not None: avg_confidence += d.confidence - avg_confidence /= len([d for d in decisions if d.confidence is not None]) if decisions else 1 - + avg_confidence /= ( + len([d for d in decisions if d.confidence is not None]) if decisions else 1 + ) + # Calculate tool usage statistics tool_counts = defaultdict(int) - tool_success_rate = defaultdict(lambda: {'success': 0, 'total': 0}) + tool_success_rate = defaultdict(lambda: {"success": 0, "total": 0}) avg_tool_duration = defaultdict(list) - + for t in tool_usages: tool_counts[t.tool_name] += 1 - tool_success_rate[t.tool_name]['total'] += 1 + tool_success_rate[t.tool_name]["total"] += 1 if t.success: - tool_success_rate[t.tool_name]['success'] += 1 + tool_success_rate[t.tool_name]["success"] += 1 avg_tool_duration[t.tool_name].append(t.duration_ms) - + # Calculate average durations avg_durations = {} for tool, durations in avg_tool_duration.items(): avg_durations[tool] = sum(durations) / len(durations) if durations else 0 - + # Calculate success rates success_rates = {} for tool, counts in tool_success_rate.items(): - success_rates[tool] = counts['success'] / counts['total'] if counts['total'] > 0 else 0 - + success_rates[tool] = counts["success"] / counts["total"] if counts["total"] > 0 else 0 + return { - 'agent_name': agent_name or 'all', - 'total_decisions': len(decisions), - 'total_tool_usages': len(tool_usages), - 'decision_types': dict(decision_types), - 'average_confidence': avg_confidence, - 'tool_usage_counts': dict(tool_counts), - 'tool_success_rates': success_rates, - 'average_tool_durations_ms': avg_durations, + "agent_name": agent_name or "all", + "total_decisions": len(decisions), + "total_tool_usages": len(tool_usages), + "decision_types": dict(decision_types), + "average_confidence": avg_confidence, + "tool_usage_counts": dict(tool_counts), + "tool_success_rates": success_rates, + "average_tool_durations_ms": avg_durations, } - + def get_decision_trace(self, agent_name: str = None, limit: int = 100) -> List[Dict]: """Get a trace of recent decisions""" with self._lock: decisions = self.decisions.copy() - + if agent_name: decisions = [d for d in decisions if d.agent_name == agent_name] - + # Sort by timestamp (most recent first) and limit decisions = sorted(decisions, key=lambda d: d.timestamp, reverse=True)[:limit] - + return [asdict(d) for d in decisions] - + def get_tool_usage_trace(self, agent_name: str = None, limit: int = 100) -> List[Dict]: """Get a trace of recent tool usages""" with self._lock: usages = self.tool_usages.copy() - + if agent_name: usages = [u for u in usages if u.agent_name == agent_name] - + # Sort by timestamp (most recent first) and limit usages = sorted(usages, key=lambda u: u.timestamp, reverse=True)[:limit] - + return [asdict(u) for u in usages] - + def export_analysis(self, filepath: str): """Export analysis data to JSON file""" data = { - 'export_timestamp': datetime.now().isoformat(), - 'statistics': self.get_agent_statistics(), - 'decision_trace': self.get_decision_trace(limit=1000), - 'tool_usage_trace': self.get_tool_usage_trace(limit=1000), + "export_timestamp": datetime.now().isoformat(), + "statistics": self.get_agent_statistics(), + "decision_trace": self.get_decision_trace(limit=1000), + "tool_usage_trace": self.get_tool_usage_trace(limit=1000), } - - with open(filepath, 'w') as f: + + with open(filepath, "w") as f: json.dump(data, f, indent=2, default=str) - + def clear(self): """Clear all recorded data""" with self._lock: @@ -587,7 +600,7 @@ def get_log_analyzer() -> AgentLogAnalyzer: class EnhancedAgentLogger(AgentLogger): """ Enhanced agent logger with comprehensive logging capabilities. - + Features: - Structured logging with context - Decision and reasoning logging @@ -595,19 +608,19 @@ class EnhancedAgentLogger(AgentLogger): - Debug mode with verbose output - Integration with log analyzer """ - + def __init__(self, name: str, debug_mode: bool = False): super().__init__(name) self._debug_mode = debug_mode or os.getenv("AGENT_DEBUG_MODE", "false").lower() == "true" self._analyzer = get_log_analyzer() self._operation_stack: List[Dict[str, Any]] = [] self._lock = threading.Lock() - + def set_debug_mode(self, enabled: bool): """Enable or disable debug mode""" self._debug_mode = enabled self.debug(f"Debug mode {'enabled' if enabled else 'disabled'}") - + def log_decision( self, decision_type: str, @@ -615,11 +628,11 @@ def log_decision( reasoning: str, confidence: float = None, alternatives: List[str] = None, - **context + **context, ): """ Log an agent decision with reasoning. - + Args: decision_type: Type of decision (e.g., 'feature_mapping', 'code_translation') decision: The decision made @@ -638,33 +651,27 @@ def log_decision( alternatives_considered=alternatives or [], context=context, ) - + # Record for analysis self._analyzer.record_decision(decision_record) - + # Log the decision log_context = { - 'decision_type': decision_type, - 'confidence': confidence, - 'alternatives_count': len(alternatives) if alternatives else 0, - **context + "decision_type": decision_type, + "confidence": confidence, + "alternatives_count": len(alternatives) if alternatives else 0, + **context, } - + if self._debug_mode: # Verbose output in debug mode - self.info( - f"DECISION [{decision_type}]: {decision}", - agent_context=log_context - ) + self.info(f"DECISION [{decision_type}]: {decision}", agent_context=log_context) self.debug(f" Reasoning: {reasoning}") if alternatives: self.debug(f" Alternatives considered: {alternatives}") else: - self.info( - f"Decision: {decision_type} -> {decision}", - agent_context=log_context - ) - + self.info(f"Decision: {decision_type} -> {decision}", agent_context=log_context) + def log_tool_call( self, tool_name: str, @@ -672,11 +679,11 @@ def log_tool_call( result: Any = None, success: bool = True, duration_ms: float = 0, - error: str = None + error: str = None, ): """ Log a tool call with input/output details. - + Args: tool_name: Name of the tool called input_params: Parameters passed to the tool @@ -690,40 +697,42 @@ def log_tool_call( agent_name=self.agent_name, tool_name=tool_name, input_params=input_params, - output_result=result if not isinstance(result, str) else result[:1000], # Truncate long strings + output_result=( + result if not isinstance(result, str) else result[:1000] + ), # Truncate long strings success=success, duration_ms=duration_ms, error_message=error, ) - + # Record for analysis self._analyzer.record_tool_usage(usage_record) - + # Log the tool call if success: self.info( f"Tool call: {tool_name} ({duration_ms:.1f}ms)", tool_name=tool_name, - operation_time=duration_ms / 1000 + operation_time=duration_ms / 1000, ) else: self.error( f"Tool call failed: {tool_name} - {error}", tool_name=tool_name, - operation_time=duration_ms / 1000 + operation_time=duration_ms / 1000, ) - + # Debug mode: log input/output details if self._debug_mode: self.debug(f" Tool input: {json.dumps(input_params, default=str)[:500]}") if success and result is not None: result_str = str(result)[:500] self.debug(f" Tool output: {result_str}") - + def log_reasoning_step(self, step: str, details: str = None): """ Log a reasoning step in the agent's thought process. - + Args: step: Name/description of the reasoning step details: Additional details about the step @@ -732,11 +741,11 @@ def log_reasoning_step(self, step: str, details: str = None): self.debug(f"Reasoning: {step}") if details: self.debug(f" Details: {details}") - + def log_state_change(self, state_name: str, old_value: Any, new_value: Any, reason: str = None): """ Log a state change in the agent. - + Args: state_name: Name of the state variable old_value: Previous value @@ -744,77 +753,79 @@ def log_state_change(self, state_name: str, old_value: Any, new_value: Any, reas reason: Reason for the change """ context = { - 'state_name': state_name, - 'old_value': str(old_value)[:200], - 'new_value': str(new_value)[:200], + "state_name": state_name, + "old_value": str(old_value)[:200], + "new_value": str(new_value)[:200], } if reason: - context['reason'] = reason - + context["reason"] = reason + self.debug(f"State change: {state_name}", agent_context=context) - + def push_operation(self, operation: str, **context): """Push an operation onto the operation stack""" with self._lock: - self._operation_stack.append({ - 'operation': operation, - 'context': context, - 'start_time': time.time(), - }) + self._operation_stack.append( + { + "operation": operation, + "context": context, + "start_time": time.time(), + } + ) self.debug(f"Started operation: {operation}") - + def pop_operation(self, success: bool = True, error: str = None): """Pop an operation from the stack and log its completion""" with self._lock: if not self._operation_stack: self.warning("Attempted to pop operation from empty stack") return - + op = self._operation_stack.pop() - - duration = time.time() - op['start_time'] - operation = op['operation'] - + + duration = time.time() - op["start_time"] + operation = op["operation"] + if success: self.info(f"Completed operation: {operation} ({duration:.3f}s)") else: self.error(f"Failed operation: {operation} ({duration:.3f}s) - {error}") - + def get_current_operation(self) -> Optional[str]: """Get the current operation name""" with self._lock: if self._operation_stack: - return self._operation_stack[-1]['operation'] + return self._operation_stack[-1]["operation"] return None - + def log_error_with_trace(self, message: str, error: Exception = None): """ Log an error with full stack trace. - + Args: message: Error message error: Exception object (optional) """ self.error(message) - + if error and self._debug_mode: # Log full stack trace in debug mode trace = traceback.format_exc() self.debug(f"Stack trace:\n{trace}") - + def log_performance_metric(self, metric_name: str, value: float, unit: str = None): """ Log a performance metric. - + Args: metric_name: Name of the metric value: Metric value unit: Unit of measurement """ context = { - 'metric_name': metric_name, - 'metric_value': value, - 'metric_unit': unit, + "metric_name": metric_name, + "metric_value": value, + "metric_unit": unit, } self.debug(f"Performance: {metric_name} = {value}{unit or ''}", agent_context=context) @@ -822,11 +833,11 @@ def log_performance_metric(self, metric_name: str, value: float, unit: str = Non def get_enhanced_agent_logger(agent_name: str, debug_mode: bool = False) -> EnhancedAgentLogger: """ Get an enhanced agent logger with comprehensive logging capabilities. - + Args: agent_name: Name of the agent debug_mode: Enable debug mode for verbose output - + Returns: EnhancedAgentLogger instance """ @@ -847,7 +858,7 @@ def disable_global_debug_mode(): def export_log_analysis(filepath: str = None): """ Export log analysis to a JSON file. - + Args: filepath: Path to export file (optional, defaults to log directory) """ @@ -855,23 +866,23 @@ def export_log_analysis(filepath: str = None): log_dir = Path(os.getenv("LOG_DIR", "/tmp/modporter-ai/logs")) log_dir.mkdir(parents=True, exist_ok=True) filepath = log_dir / f"agent_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" - + analyzer = get_log_analyzer() analyzer.export_analysis(str(filepath)) - + logger = logging.getLogger(__name__) logger.info(f"Log analysis exported to: {filepath}") - + return filepath def get_agent_log_summary(agent_name: str = None) -> Dict[str, Any]: """ Get a summary of agent logging activity. - + Args: agent_name: Specific agent name (optional, returns all if not specified) - + Returns: Dictionary with agent statistics """ diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 63f8b0e3..524d5bb9 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -73,11 +73,15 @@ exclude = [ ] [tool.ruff.lint] -# Enable pycodestyle (E), Pyflakes (F), isort (I), flake8-quotes (Q), flake8-naming (N), pydocstyle (D), pyupgrade (UP) -select = ["E", "F", "I", "Q", "N", "D", "UP"] +# Enable pycodestyle (E), Pyflakes (F), warnings (W), isort (I), flake8-quotes (Q), flake8-naming (N), pydocstyle (D), pyupgrade (UP) +select = ["E", "F", "W", "I", "Q", "N", "D", "UP"] ignore = [ "E402", # module level import not at top of file "E501", # line too long (handled by formatter) + # W (pycodestyle warnings) ignores for legacy code + "W291", # trailing whitespace (legacy code has inconsistent whitespace) + "W292", # no newline at end of file (legacy code has inconsistent newlines) + "W293", # blank line contains whitespace (legacy code has inconsistent blank lines) "Q000", # bad quote marks "Q001", # bad quote marks in multiline strings "Q002", # bad quote marks in strings diff --git a/backend/requirements-dev.txt b/backend/requirements-dev.txt index 0d08d43f..fb588267 100644 --- a/backend/requirements-dev.txt +++ b/backend/requirements-dev.txt @@ -13,3 +13,7 @@ pre-commit>=3.0.0 # Logging structlog + +# Dependency checking +pip-audit>=2.7.0 +pipdeptree>=2.23.0 diff --git a/backend/requirements.txt b/backend/requirements.txt index 30d6bc8e..d9dc5e73 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -33,6 +33,16 @@ prometheus_client>=0.17.0 sentry-sdk[fastapi]>=2.0.0 structlog>=24.0.0 +# Distributed Tracing - OpenTelemetry +# Note: opentelemetry-exporter-jaeger 1.21.0 is the latest version compatible with Python 3.11 +opentelemetry-api>=1.24.0 +opentelemetry-sdk>=1.24.0 +opentelemetry-exporter-otlp>=1.24.0 +opentelemetry-exporter-jaeger==1.21.0 +opentelemetry-instrumentation-fastapi>=0.45b0 +opentelemetry-instrumentation-httpx>=0.45b0 +opentelemetry-instrumentation-redis>=0.45b0 + # Testing pytest>=8.2 pytest-asyncio==1.3.0 diff --git a/backend/src/main.py b/backend/src/main.py index de2abfb6..4c48bb47 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -48,6 +48,7 @@ ) from services.security_headers import SecurityHeadersMiddleware from services.logging_middleware import LoggingMiddleware, RequestContextMiddleware +from services.tracing import init_tracing, shutdown_tracing # Import API routers from api import ( @@ -101,8 +102,15 @@ async def lifespan(app: FastAPI): if testing_env != "true": await init_db() logger.info("Database initialized") + + # Initialize tracing + init_tracing(app=app, service_name="modporter-backend") + logger.info("Distributed tracing initialized") + yield + # Shutdown + shutdown_tracing() logger.info("Application shutdown") diff --git a/backend/src/services/ai_engine_client.py b/backend/src/services/ai_engine_client.py index e517e5aa..48ab6211 100644 --- a/backend/src/services/ai_engine_client.py +++ b/backend/src/services/ai_engine_client.py @@ -3,6 +3,7 @@ Provides HTTP client for communicating with the AI Engine API. Handles file transfers, conversion requests, and progress polling. +Includes distributed tracing support. """ import asyncio @@ -13,6 +14,8 @@ import httpx +from services.tracing import inject_trace_context, get_trace_id + logger = logging.getLogger(__name__) # AI Engine configuration @@ -63,6 +66,21 @@ async def _get_client(self) -> httpx.AsyncClient: ) return self._client + def _get_trace_headers(self) -> Dict[str, str]: + """ + Get trace context headers for propagating to downstream services. + + Returns: + Dictionary of trace headers to include in requests + """ + headers = {} + inject_trace_context(headers) + # Also add trace_id for logging purposes + trace_id = get_trace_id() + if trace_id: + headers["X-Trace-ID"] = trace_id + return headers + async def close(self): """Close the HTTP client.""" if self._client and not self._client.is_closed: @@ -78,7 +96,8 @@ async def health_check(self) -> bool: """ try: client = await self._get_client() - response = await client.get("/api/v1/health") + headers = self._get_trace_headers() + response = await client.get("/api/v1/health", headers=headers) return response.status_code == 200 except Exception as e: logger.warning(f"AI Engine health check failed: {e}") @@ -108,6 +127,7 @@ async def start_conversion( """ try: client = await self._get_client() + headers = self._get_trace_headers() request_data = { "job_id": job_id, @@ -121,6 +141,7 @@ async def start_conversion( response = await client.post( "/api/v1/convert", json=request_data, + headers=headers, ) if response.status_code != 200: @@ -161,7 +182,8 @@ async def get_conversion_status(self, job_id: str) -> Dict[str, Any]: """ try: client = await self._get_client() - response = await client.get(f"/api/v1/status/{job_id}") + headers = self._get_trace_headers() + response = await client.get(f"/api/v1/status/{job_id}", headers=headers) if response.status_code == 404: raise AIEngineError("Job not found", status_code=404) @@ -279,7 +301,11 @@ async def poll_conversion_status( except AIEngineError as e: if e.status_code == 404: # Job not found - treat as terminal - yield {"status": "failed", "message": "Job not found", "progress": 0} + yield { + "status": "failed", + "message": "Job not found", + "progress": 0, + } break raise diff --git a/backend/src/services/structured_logging.py b/backend/src/services/structured_logging.py index 3ea81f2b..dc1ff56a 100644 --- a/backend/src/services/structured_logging.py +++ b/backend/src/services/structured_logging.py @@ -56,12 +56,15 @@ def configure_structlog( log_dir = os.getenv("LOG_DIR", "/var/log/modporter") # Configure processors based on format + # Order matters: context merging -> logger info -> level -> timestamper -> renderer -> exception handling processors = [ structlog.contextvars.merge_contextvars, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, ] if debug_mode: @@ -71,10 +74,6 @@ def configure_structlog( else: processors.append(structlog.dev.ConsoleRenderer(colors=False)) - # Add exception info processor - processors.append(structlog.processors.StackInfoRenderer()) - processors.append(structlog.processors.format_exc_info) - # Configure structlog structlog.configure( processors=processors, @@ -99,7 +98,8 @@ def configure_structlog( else: console_handler.setFormatter( logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", ) ) root_logger.addHandler(console_handler) @@ -207,7 +207,8 @@ def get_standard_logger(name: str) -> logging.Logger: console_handler.setLevel(logging.INFO) console_formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", ) console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) diff --git a/backend/src/services/tracing.py b/backend/src/services/tracing.py new file mode 100644 index 00000000..dca50ddc --- /dev/null +++ b/backend/src/services/tracing.py @@ -0,0 +1,320 @@ +""" +Distributed Tracing Service using OpenTelemetry. + +This module provides tracing capabilities for the ModPorter AI application, +including trace context propagation between services. +""" + +import os +from typing import Optional +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.jaeger.thrift import JaegerExporter +from opentelemetry.sdk.extension.aws.resource.ec2 import AwsEc2ResourceDetector +from opentelemetry.sdk.extension.aws.resource.ecs import AwsEcsResourceDetector +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.trace import Status, StatusCode +from opentelemetry.context import Context +import logging + +logger = logging.getLogger(__name__) + +# Trace context propagator (W3C Trace Context) +tracer_propagator = TraceContextTextMapPropagator() + +# Global tracer instance +_tracer: Optional[trace.Tracer] = None +_tracer_provider: Optional[TracerProvider] = None + + +def get_tracer(service_name: str = "modporter-backend") -> trace.Tracer: + """ + Get or create a tracer instance for the given service. + + Args: + service_name: Name of the service for tracing + + Returns: + Configured tracer instance + """ + global _tracer, _tracer_provider + + if _tracer is not None: + return _tracer + + # Create resource with service information + service_version = os.getenv("SERVICE_VERSION", "1.0.0") + resource = Resource.create( + { + SERVICE_NAME: service_name, + SERVICE_VERSION: service_version, + } + ) + + # Add cloud metadata if available + try: + ec2_resource = AwsEc2ResourceDetector().detect() + resource = resource.merge(ec2_resource) + except Exception: + pass + + try: + ecs_resource = AwsEcsResourceDetector().detect() + resource = resource.merge(ecs_resource) + except Exception: + pass + + # Create tracer provider + _tracer_provider = TracerProvider(resource=resource) + + # Configure exporter based on environment + tracing_enabled = os.getenv("TRACING_ENABLED", "true").lower() == "true" + tracing_exporter = os.getenv("TRACING_EXPORTER", "jaeger").lower() + + if tracing_enabled: + if tracing_exporter == "jaeger": + # Jaeger exporter configuration + jaeger_host = os.getenv("JAEGER_HOST", "localhost") + jaeger_port = int(os.getenv("JAEGER_PORT", "6831")) + + jaeger_exporter = JaegerExporter( + agent_host_name=jaeger_host, + agent_port=jaeger_port, + ) + _tracer_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter)) + logger.info(f"Jaeger tracing enabled: {jaeger_host}:{jaeger_port}") + + elif tracing_exporter == "otlp": + # OTLP exporter configuration + otlp_endpoint = os.getenv("OTLP_ENDPOINT", "http://localhost:4317") + + otlp_exporter = OTLPSpanExporter( + endpoint=otlp_endpoint, + insecure=True, + ) + _tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter)) + logger.info(f"OTLP tracing enabled: {otlp_endpoint}") + + # Add console exporter for development + if os.getenv("TRACING_CONSOLE", "false").lower() == "true": + _tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) + logger.info("Console span exporter enabled") + + # Set the global tracer provider + trace.set_tracer_provider(_tracer_provider) + + # Create and return tracer + _tracer = trace.get_tracer(service_name) + + logger.info(f"Tracing initialized for service: {service_name}") + + return _tracer + + +def init_tracing( + app=None, + service_name: str = "modporter-backend", + instrument_fastapi: bool = True, + instrument_httpx: bool = True, + instrument_redis: bool = True, +) -> trace.Tracer: + """ + Initialize tracing with automatic instrumentation. + + Args: + app: FastAPI application instance (optional) + service_name: Name of the service + instrument_fastapi: Whether to instrument FastAPI + instrument_httpx: Whether to instrument HTTPX + instrument_redis: Whether to instrument Redis + + Returns: + Configured tracer instance + """ + tracer = get_tracer(service_name) + + # Instrument FastAPI if app provided + if app and instrument_fastapi: + try: + FastAPIInstrumentor.instrument_app(app) + logger.info("FastAPI instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument FastAPI: {e}") + + # Instrument HTTPX + if instrument_httpx: + try: + HTTPXClientInstrumentor().instrument() + logger.info("HTTPX instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument HTTPX: {e}") + + # Instrument Redis + if instrument_redis: + try: + RedisInstrumentor().instrument() + logger.info("Redis instrumentation enabled") + except Exception as e: + logger.warning(f"Failed to instrument Redis: {e}") + + return tracer + + +def extract_trace_context(carrier: dict) -> Context: + """ + Extract trace context from carrier (e.g., HTTP headers). + + Args: + carrier: Dictionary containing trace context (e.g., HTTP headers) + + Returns: + Extracted context + """ + return tracer_propagator.extract(carrier) + + +def inject_trace_context(carrier: dict) -> dict: + """ + Inject trace context into carrier (e.g., HTTP headers). + + Args: + carrier: Dictionary to inject trace context into + + Returns: + Carrier with injected trace context + """ + tracer_propagator.inject(carrier) + return carrier + + +def create_span( + name: str, + context: Optional[Context] = None, + kind: trace.SpanKind = trace.SpanKind.INTERNAL, +) -> trace.Span: + """ + Create a new span with the given name and context. + + Args: + name: Name of the span + context: Parent context (optional) + kind: Span kind + + Returns: + New span + """ + tracer = get_tracer() + + if context: + with tracer.start_as_current_span(name, context=context, kind=kind) as span: + return span + else: + with tracer.start_as_current_span(name, kind=kind) as span: + return span + + +def add_span_attributes(span: trace.Span, attributes: dict) -> None: + """ + Add attributes to a span. + + Args: + span: Span to add attributes to + attributes: Dictionary of attributes + """ + for key, value in attributes.items(): + if value is not None: + span.set_attribute(key, str(value)) + + +def record_span_exception(span: trace.Span, exception: Exception) -> None: + """ + Record an exception on a span. + + Args: + span: Span to record exception on + exception: Exception to record + """ + span.set_status(Status(StatusCode.ERROR, str(exception))) + span.record_exception(exception) + + +def shutdown_tracing() -> None: + """Shutdown the tracing provider and flush any pending spans.""" + global _tracer_provider + + if _tracer_provider: + _tracer_provider.shutdown() + logger.info("Tracing provider shutdown") + + +class TracingMiddleware: + """ + Middleware for FastAPI to handle trace context propagation. + + This middleware extracts trace context from incoming requests + and injects it into outgoing requests. + """ + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + # Extract trace context from headers + headers = dict(scope.get("headers", [])) + # Convert bytes to string for headers + headers = {k.decode(): v.decode() for k, v in headers.items()} + + # The FastAPI instrumentation will handle this automatically, + # but we keep this for custom use cases + context = extract_trace_context(headers) + + # Continue with the request + await self.app(scope, receive, send) + + +def get_current_span() -> Optional[trace.Span]: + """ + Get the current active span if any. + + Returns: + Current span or None + """ + return trace.get_current_span() + + +def get_trace_id() -> Optional[str]: + """ + Get the current trace ID as a hex string. + + Returns: + Trace ID or None + """ + span = get_current_span() + if span: + trace_id = span.get_span_context().trace_id + return format(trace_id, "032x") + return None + + +def get_span_id() -> Optional[str]: + """ + Get the current span ID as a hex string. + + Returns: + Span ID or None + """ + span = get_current_span() + if span: + span_id = span.get_span_context().span_id + return format(span_id, "016x") + return None diff --git a/docker-compose.yml b/docker-compose.yml index 5e3245ad..194bcae8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -26,11 +26,17 @@ services: - REDIS_URL=redis://redis:6379 - DATABASE_URL=postgresql+asyncpg://postgres:password@postgres:5432/modporter - LOG_LEVEL=INFO + - TRACING_ENABLED=true + - TRACING_EXPORTER=jaeger + - JAEGER_HOST=jaeger + - JAEGER_PORT=6831 depends_on: redis: condition: service_healthy postgres: condition: service_healthy + jaeger: + condition: service_started volumes: - ./backend/src:/app/src - conversion-cache:/app/cache @@ -59,9 +65,15 @@ services: - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - LOG_LEVEL=INFO - PORT=8001 + - TRACING_ENABLED=true + - TRACING_EXPORTER=jaeger + - JAEGER_HOST=jaeger + - JAEGER_PORT=6831 depends_on: redis: condition: service_healthy + jaeger: + condition: service_started volumes: - model-cache:/app/models - temp-uploads:/app/temp_uploads @@ -120,6 +132,34 @@ services: retries: 5 start_period: 30s + # Jaeger for distributed tracing visualization + jaeger: + image: jaegertracing/all-in-one:1.52 + ports: + - "6831:6831/udp" # Jaeger thrift compact protocol + - "16686:16686" # Jaeger UI + - "4317:4317" # OTLP gRPC receiver + - "4318:4318" # OTLP HTTP receiver + environment: + - COLLECTOR_OTLP_ENABLED=true + - SPAN_STORAGE_TYPE=badger + volumes: + - jaeger-data:/var/lib/jaeger + networks: + - modporter-network + restart: unless-stopped + healthcheck: + test: ["CMD", "wget", "--spider", "-q", "http://localhost:16686"] + interval: 30s + timeout: 10s + retries: 3 + deploy: + resources: + limits: + memory: 512M + reservations: + memory: 256M + volumes: redis-data: driver: local @@ -133,6 +173,8 @@ volumes: driver: local conversion-outputs: driver: local + jaeger-data: + driver: local networks: modporter-network: