diff --git a/Examples/11-mcp-integration-guide.mdx b/Examples/11-mcp-integration-guide.mdx new file mode 100644 index 0000000..807fb43 --- /dev/null +++ b/Examples/11-mcp-integration-guide.mdx @@ -0,0 +1,526 @@ +--- +title: "MCP Integration with MultiMCPTools" +description: "Comprehensive guide for integrating MCP capabilities using MultiMCPTools with lifecycle decorators" +icon: "network-wired" +--- + + + **Integration Guide Summary** + + - **Goal**: Implement MCP (Model Context Protocol) integration using MultiMCPTools + - **SDK Version**: v2.0.32 or higher + - **Prerequisites**: xpander-sdk, knowledge of MCP protocol + - **Reference**: XPander issue #401 + + +## Overview + +The MultiMCPTools integration provides a streamlined way to implement MCP (Model Context Protocol) capabilities in your XPander agents. This guide covers the complete integration process, from basic setup to advanced lifecycle management with proper error handling and status monitoring. + +## Key Features + + + +Seamless integration with multiple MCP servers and tools + + + +Built-in support for on_boot, on_shutdown, and on_task decorators + + + +Real-time status monitoring and health checks for MCP connections + + + +Robust error handling with automatic reconnection capabilities + + + +## Installation and Setup + + + + +### Environment Setup +```bash +python3 -m venv .venv +source .venv/bin/activate +pip install "xpander-sdk[agno]>=2.0.32" +``` + +### Required Dependencies +The MultiMCPTools integration requires the following packages: +- `xpander-sdk` (v2.0.32+) +- `mcp-client` (automatically installed) +- `asyncio` (built-in) + + + + +## Implementation Example + +Here's the canonical implementation example demonstrating MCP integration with lifecycle decorators: + +```python mcp_integration_example.py +from dotenv import load_dotenv +load_dotenv() + +from xpander_sdk import Backend, on_boot, on_shutdown, on_task +from xpander_sdk.tools import MultiMCPTools +from agno.agent import Agent +import asyncio +import logging + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Global MCP tools instance +mcp_tools = None +mcp_status = {"connected": False, "servers": {}} + +@on_boot +async def initialize_mcp(): + """Initialize MCP connections during application boot.""" + global mcp_tools, mcp_status + + logger.info("๐Ÿ”Œ Initializing MCP connections...") + + try: + # Configure MCP servers + mcp_servers = { + "filesystem": { + "command": "npx", + "args": ["@modelcontextprotocol/server-filesystem", "/tmp"] + }, + "github": { + "command": "npx", + "args": ["@modelcontextprotocol/server-github"] + }, + "brave": { + "command": "npx", + "args": ["@modelcontextprotocol/server-brave-search"] + } + } + + # Initialize MultiMCPTools + mcp_tools = MultiMCPTools(servers=mcp_servers) + await mcp_tools.connect_all() + + # Update status tracking + mcp_status["connected"] = True + for server_name in mcp_servers.keys(): + mcp_status["servers"][server_name] = { + "status": "connected", + "last_ping": "boot_time", + "tools_available": len(await mcp_tools.list_tools(server_name)) + } + + logger.info(f"โœ… MCP initialization complete. Connected to {len(mcp_servers)} servers") + + except Exception as e: + logger.error(f"โŒ MCP initialization failed: {e}") + mcp_status["connected"] = False + raise + +@on_boot +def validate_mcp_environment(): + """Validate MCP-specific environment variables.""" + import os + + logger.info("๐Ÿ” Validating MCP environment...") + + # Check for optional MCP configuration + mcp_config_path = os.getenv("MCP_CONFIG_PATH", "~/.mcp/config.json") + logger.info(f"๐Ÿ“ MCP config path: {mcp_config_path}") + + # Validate GitHub token for MCP GitHub server (if used) + github_token = os.getenv("GITHUB_TOKEN") + if github_token: + logger.info("โœ… GitHub token found for MCP integration") + else: + logger.warning("โš ๏ธ No GitHub token found - MCP GitHub server may have limited functionality") + + logger.info("โœ… MCP environment validation complete") + +@on_task +async def handle_mcp_request(task): + """Handle incoming tasks using MCP capabilities.""" + global mcp_tools, mcp_status + + logger.info(f"๐Ÿ“จ Processing MCP task: {task.id}") + + if not mcp_status["connected"] or not mcp_tools: + task.result = { + "error": "MCP tools not available", + "status": "failed", + "message": "MCP integration is not initialized" + } + return task + + try: + # Analyze task to determine which MCP server to use + task_text = task.input.text.lower() + + if "file" in task_text or "directory" in task_text: + # Use filesystem MCP server + server_name = "filesystem" + available_tools = await mcp_tools.list_tools(server_name) + + if "read_file" in [tool["name"] for tool in available_tools]: + # Example: reading a file + if "read" in task_text: + file_path = extract_file_path(task.input.text) # Custom helper function + result = await mcp_tools.call_tool(server_name, "read_file", {"path": file_path}) + + task.result = { + "content": result["content"], + "file_path": file_path, + "server_used": server_name, + "status": "completed" + } + + elif "github" in task_text or "repository" in task_text: + # Use GitHub MCP server + server_name = "github" + available_tools = await mcp_tools.list_tools(server_name) + + if "search_repositories" in [tool["name"] for tool in available_tools]: + # Example: searching GitHub repositories + query = extract_github_query(task.input.text) # Custom helper function + result = await mcp_tools.call_tool(server_name, "search_repositories", {"query": query}) + + task.result = { + "repositories": result["items"], + "query": query, + "server_used": server_name, + "status": "completed" + } + + elif "search" in task_text: + # Use Brave search MCP server + server_name = "brave" + available_tools = await mcp_tools.list_tools(server_name) + + if "brave_search" in [tool["name"] for tool in available_tools]: + query = extract_search_query(task.input.text) # Custom helper function + result = await mcp_tools.call_tool(server_name, "brave_search", {"query": query}) + + task.result = { + "search_results": result["results"], + "query": query, + "server_used": server_name, + "status": "completed" + } + + else: + # Default handling - list available capabilities + all_capabilities = {} + for server in mcp_status["servers"]: + if mcp_status["servers"][server]["status"] == "connected": + tools = await mcp_tools.list_tools(server) + all_capabilities[server] = [tool["name"] for tool in tools] + + task.result = { + "message": "MCP integration ready", + "available_capabilities": all_capabilities, + "status": "info" + } + + except Exception as e: + logger.error(f"โŒ MCP task processing failed: {e}") + task.result = { + "error": str(e), + "status": "failed", + "server_status": mcp_status + } + + logger.info(f"โœ… Task {task.id} completed") + return task + +@on_shutdown +async def cleanup_mcp_connections(): + """Clean up MCP connections during shutdown.""" + global mcp_tools, mcp_status + + logger.info("๐Ÿ”Œ Shutting down MCP connections...") + + if mcp_tools: + try: + await mcp_tools.disconnect_all() + logger.info("โœ… All MCP connections closed") + except Exception as e: + logger.error(f"โŒ Error closing MCP connections: {e}") + + # Reset status + mcp_status = {"connected": False, "servers": {}} + mcp_tools = None + +@on_shutdown +def save_mcp_metrics(): + """Save MCP usage metrics before shutdown.""" + global mcp_status + + logger.info("๐Ÿ“Š Saving MCP metrics...") + + if mcp_status.get("servers"): + total_servers = len(mcp_status["servers"]) + connected_servers = sum(1 for s in mcp_status["servers"].values() if s.get("status") == "connected") + + metrics = { + "total_servers": total_servers, + "connected_servers": connected_servers, + "connection_rate": connected_servers / total_servers if total_servers > 0 else 0 + } + + logger.info(f"๐Ÿ“ˆ MCP metrics: {metrics}") + # In production, save to persistent storage or monitoring system + +# Helper functions for task processing +def extract_file_path(text): + """Extract file path from task text.""" + # Simple implementation - in production, use more sophisticated parsing + words = text.split() + for word in words: + if "/" in word and not word.startswith("http"): + return word + return "/tmp/example.txt" # Default for demo + +def extract_github_query(text): + """Extract GitHub search query from task text.""" + # Remove common words and extract meaningful search terms + stop_words = {"github", "repository", "search", "find", "look", "for"} + words = [w for w in text.lower().split() if w not in stop_words] + return " ".join(words[:5]) # Limit to first 5 meaningful words + +def extract_search_query(text): + """Extract search query from task text.""" + # Remove the word "search" and return the rest + return text.replace("search", "").strip() + +# Initialize backend and agent +backend = Backend() +agno_agent = Agent(**backend.get_args()) + +# The agent is now ready with MCP integration +logger.info("๐Ÿš€ Agent ready with MCP integration!") +logger.info("๐Ÿ“‹ Try asking: 'Search for Python MCP examples on GitHub'") +logger.info("๐Ÿ“‹ Try asking: 'Read the file /tmp/config.json'") +logger.info("๐Ÿ“‹ Try asking: 'Search for MCP protocol documentation'") + +# Example usage +if __name__ == "__main__": + agno_agent.print_response(message="Search for Python MCP examples on GitHub") +``` + +## Status Management and Health Checks + +Monitor your MCP integration health with these utilities: + +```python mcp_status_monitoring.py +import asyncio +from datetime import datetime + +@on_task +async def check_mcp_health(task): + """Health check endpoint for MCP integration.""" + global mcp_tools, mcp_status + + health_report = { + "timestamp": datetime.now().isoformat(), + "overall_status": "healthy" if mcp_status["connected"] else "unhealthy", + "servers": {} + } + + if mcp_tools: + for server_name in mcp_status["servers"]: + try: + # Ping server + tools = await mcp_tools.list_tools(server_name) + health_report["servers"][server_name] = { + "status": "healthy", + "tools_count": len(tools), + "last_check": datetime.now().isoformat() + } + except Exception as e: + health_report["servers"][server_name] = { + "status": "unhealthy", + "error": str(e), + "last_check": datetime.now().isoformat() + } + + task.result = health_report + return task + +async def periodic_health_check(): + """Periodic health check for MCP connections.""" + while True: + if mcp_tools and mcp_status["connected"]: + try: + # Check each server + for server_name in mcp_status["servers"]: + await mcp_tools.list_tools(server_name) + mcp_status["servers"][server_name]["last_ping"] = datetime.now().isoformat() + + logger.info("๐Ÿ” MCP health check passed") + except Exception as e: + logger.warning(f"โš ๏ธ MCP health check failed: {e}") + # Attempt reconnection logic here + + await asyncio.sleep(300) # Check every 5 minutes + +# Start health monitoring in background +@on_boot +async def start_health_monitoring(): + """Start background health monitoring.""" + asyncio.create_task(periodic_health_check()) + logger.info("๐Ÿ“Š MCP health monitoring started") +``` + +## Configuration Examples + +### Multiple Server Configuration + +```python mcp_multi_server_config.py +# Advanced MCP server configuration +mcp_servers = { + "filesystem": { + "command": "npx", + "args": ["@modelcontextprotocol/server-filesystem", "/workspace"], + "env": {"FILESYSTEM_ROOT": "/workspace"} + }, + "github": { + "command": "npx", + "args": ["@modelcontextprotocol/server-github"], + "env": {"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN")} + }, + "postgres": { + "command": "npx", + "args": ["@modelcontextprotocol/server-postgres"], + "env": { + "POSTGRES_CONNECTION_STRING": os.getenv("DATABASE_URL") + } + }, + "slack": { + "command": "npx", + "args": ["@modelcontextprotocol/server-slack"], + "env": { + "SLACK_BOT_TOKEN": os.getenv("SLACK_BOT_TOKEN"), + "SLACK_USER_TOKEN": os.getenv("SLACK_USER_TOKEN") + } + } +} +``` + +### Environment-based Configuration + +```python mcp_env_config.py +import os +import json + +def load_mcp_config(): + """Load MCP configuration from environment or file.""" + + # Try environment variable first + config_json = os.getenv("MCP_CONFIG") + if config_json: + return json.loads(config_json) + + # Try config file + config_path = os.getenv("MCP_CONFIG_PATH", "~/.mcp/config.json") + expanded_path = os.path.expanduser(config_path) + + if os.path.exists(expanded_path): + with open(expanded_path, 'r') as f: + return json.load(f) + + # Default configuration + return { + "servers": { + "filesystem": { + "command": "npx", + "args": ["@modelcontextprotocol/server-filesystem", "/tmp"] + } + } + } +``` + +## Best Practices + + + + +1. **Always use lifecycle decorators** for proper initialization and cleanup +2. **Implement health checks** to monitor server connectivity +3. **Handle connection failures gracefully** with retry logic +4. **Use environment variables** for server configuration + + + + + +1. **Catch and log MCP-specific exceptions** +2. **Provide fallback mechanisms** when MCP servers are unavailable +3. **Implement circuit breaker patterns** for unreliable servers +4. **Return meaningful error messages** to users + + + + + +1. **Pool connections** for frequently used servers +2. **Cache server capabilities** to avoid repeated discovery calls +3. **Use async/await** throughout for non-blocking operations +4. **Implement request timeouts** to prevent hanging operations + + + + +## Troubleshooting + + + + +### Connection Failures +```python +# Check server status +if not await mcp_tools.is_connected(server_name): + logger.error(f"Server {server_name} is not connected") + await mcp_tools.reconnect(server_name) +``` + +### Tool Discovery Issues +```python +# List available tools for debugging +tools = await mcp_tools.list_tools(server_name) +logger.info(f"Available tools for {server_name}: {[t['name'] for t in tools]}") +``` + +### Environment Problems +```python +# Validate MCP server commands +import shutil + +for server_name, config in mcp_servers.items(): + command = config["command"] + if not shutil.which(command): + logger.error(f"Command not found: {command} for server {server_name}") +``` + + + + +## Next Steps + +1. **Explore Advanced Features**: Learn about MCP resource management and streaming +2. **Custom Server Integration**: Build your own MCP servers for specific use cases +3. **Production Deployment**: Scale your MCP integration with proper monitoring +4. **Security Considerations**: Implement authentication and authorization for MCP servers + +## References + +- **XPander Issue**: #401 +- **SDK Version**: v2.0.32 +- **MCP Protocol**: [Model Context Protocol Specification](https://modelcontextprotocol.io) +- **Related Examples**: [Lifecycle Management](/Examples/10-lifecycle-management), [MCP Task Sources](/user-guide/task-sources/mcp) \ No newline at end of file diff --git a/Examples/12-advanced-lifecycle-management.mdx b/Examples/12-advanced-lifecycle-management.mdx new file mode 100644 index 0000000..0c4e311 --- /dev/null +++ b/Examples/12-advanced-lifecycle-management.mdx @@ -0,0 +1,811 @@ +--- +title: "Advanced Lifecycle Management with MCP" +description: "Comprehensive lifecycle management for MCP integration with on_boot, on_shutdown, and on_task decorators" +icon: "arrows-rotate" +--- + + + **Lifecycle Guide Summary** + + - **Goal**: Master lifecycle management for MCP-enabled applications + - **SDK Version**: v2.0.32 or higher + - **Prerequisites**: Basic understanding of async/await and decorators + - **Reference**: XPander issue #401, SDK v2.0.32 + + +## Overview + +This guide demonstrates advanced lifecycle management patterns specifically designed for MCP (Model Context Protocol) integrations. You'll learn how to properly initialize, manage, and cleanup MCP resources using XPander's lifecycle decorators. + +## Lifecycle Phases + + + +Initialize MCP servers, validate configuration, and establish connections + + + +Handle tasks with MCP tools, manage connections, and monitor health + + + +Gracefully disconnect from servers, save state, and cleanup resources + + + +## Complete Implementation + + + + +### Virtual Environment Setup +```bash setup.sh +python3 -m venv .venv +source .venv/bin/activate +pip install "xpander-sdk[agno]>=2.0.32" +``` + +### Environment Configuration +```bash .env +XPANDER_API_KEY=your_api_key +XPANDER_ORGANIZATION_ID=your_org_id +GITHUB_TOKEN=your_github_token +SLACK_BOT_TOKEN=your_slack_token +MCP_CONFIG_PATH=~/.mcp/config.json +``` + + + + +```python advanced_lifecycle_mcp.py +from dotenv import load_dotenv +load_dotenv() + +from xpander_sdk import Backend, on_boot, on_shutdown, on_task +from xpander_sdk.tools import MultiMCPTools +from agno.agent import Agent +import asyncio +import logging +import json +import os +from datetime import datetime +from typing import Dict, Any, Optional +import signal + +# Configure comprehensive logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# Global state management +class MCPState: + def __init__(self): + self.tools: Optional[MultiMCPTools] = None + self.connected: bool = False + self.servers: Dict[str, Dict[str, Any]] = {} + self.metrics: Dict[str, Any] = { + "boot_time": None, + "tasks_processed": 0, + "errors_encountered": 0, + "reconnections": 0 + } + self.shutdown_requested: bool = False + +# Global state instance +mcp_state = MCPState() + +# === BOOT PHASE === + +@on_boot +def setup_signal_handlers(): + """Setup graceful shutdown signal handlers.""" + def signal_handler(signum, frame): + logger.info(f"๐Ÿ“ก Received signal {signum}, initiating graceful shutdown...") + mcp_state.shutdown_requested = True + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + logger.info("๐Ÿ“ก Signal handlers configured") + +@on_boot +def validate_mcp_environment(): + """Comprehensive environment validation for MCP integration.""" + logger.info("๐Ÿ” Validating MCP environment...") + + # Required environment variables + required_vars = ["XPANDER_API_KEY", "XPANDER_ORGANIZATION_ID"] + missing_required = [var for var in required_vars if not os.getenv(var)] + + if missing_required: + raise EnvironmentError(f"โŒ Missing required variables: {missing_required}") + + # Optional but recommended variables + optional_vars = { + "GITHUB_TOKEN": "GitHub MCP server functionality will be limited", + "SLACK_BOT_TOKEN": "Slack MCP server will not be available", + "MCP_CONFIG_PATH": "Using default MCP configuration path" + } + + for var, warning in optional_vars.items(): + if not os.getenv(var): + logger.warning(f"โš ๏ธ {var} not set: {warning}") + else: + logger.info(f"โœ… {var} configured") + + # Validate MCP configuration file + config_path = os.path.expanduser(os.getenv("MCP_CONFIG_PATH", "~/.mcp/config.json")) + if os.path.exists(config_path): + try: + with open(config_path, 'r') as f: + config = json.load(f) + logger.info(f"โœ… MCP configuration loaded from {config_path}") + except json.JSONDecodeError as e: + logger.error(f"โŒ Invalid JSON in MCP config: {e}") + raise + else: + logger.info(f"๐Ÿ“„ No MCP config file found at {config_path}, using defaults") + + logger.info("โœ… Environment validation completed") + +@on_boot +async def initialize_mcp_tools(): + """Initialize MCP tools with comprehensive server configuration.""" + global mcp_state + + logger.info("๐Ÿ”Œ Initializing MCP tools and servers...") + mcp_state.metrics["boot_time"] = datetime.now() + + try: + # Load server configuration + servers_config = load_server_configuration() + + # Initialize MultiMCPTools + mcp_state.tools = MultiMCPTools(servers=servers_config) + + # Connect to all configured servers + connection_results = await connect_servers_with_retry(servers_config) + + # Update state based on connection results + mcp_state.connected = any(connection_results.values()) + mcp_state.servers = { + name: { + "status": "connected" if connected else "failed", + "last_ping": datetime.now().isoformat() if connected else None, + "tools_available": 0, + "reconnection_attempts": 0 + } + for name, connected in connection_results.items() + } + + # Discover available tools for connected servers + await discover_server_capabilities() + + if mcp_state.connected: + logger.info(f"โœ… MCP initialization complete. Connected to {sum(connection_results.values())}/{len(servers_config)} servers") + else: + logger.error("โŒ Failed to connect to any MCP servers") + + except Exception as e: + logger.error(f"โŒ MCP initialization failed: {e}") + mcp_state.connected = False + raise + +@on_boot +async def start_background_services(): + """Start background services for health monitoring and maintenance.""" + logger.info("๐Ÿ”„ Starting background services...") + + # Start health monitoring + asyncio.create_task(health_monitoring_service()) + + # Start metrics collection + asyncio.create_task(metrics_collection_service()) + + # Start connection maintenance + asyncio.create_task(connection_maintenance_service()) + + logger.info("โœ… Background services started") + +# === RUNTIME PHASE === + +@on_task +async def handle_mcp_task(task): + """Advanced task handling with MCP integration.""" + global mcp_state + + logger.info(f"๐Ÿ“จ Processing MCP task: {task.id}") + mcp_state.metrics["tasks_processed"] += 1 + + # Check if MCP is available + if not mcp_state.connected or not mcp_state.tools: + mcp_state.metrics["errors_encountered"] += 1 + task.result = { + "error": "MCP services unavailable", + "status": "failed", + "details": "MCP integration is not properly initialized", + "available_servers": list(mcp_state.servers.keys()) + } + return task + + try: + # Task analysis and routing + task_analysis = analyze_task_requirements(task.input.text) + + # Route to appropriate MCP server + result = await route_task_to_server(task_analysis) + + # Process result and set task response + task.result = { + "status": "completed", + "server_used": result.get("server_name"), + "operation": result.get("operation"), + "data": result.get("data"), + "processing_time_ms": result.get("processing_time"), + "timestamp": datetime.now().isoformat() + } + + logger.info(f"โœ… Task {task.id} completed successfully") + + except Exception as e: + logger.error(f"โŒ Task processing failed: {e}") + mcp_state.metrics["errors_encountered"] += 1 + + task.result = { + "error": str(e), + "status": "failed", + "server_status": {name: info["status"] for name, info in mcp_state.servers.items()}, + "timestamp": datetime.now().isoformat() + } + + return task + +@on_task +async def handle_health_check(task): + """Handle health check requests for MCP integration.""" + global mcp_state + + if "health" in task.input.text.lower() or "status" in task.input.text.lower(): + health_report = await generate_health_report() + + task.result = { + "health_report": health_report, + "status": "completed", + "timestamp": datetime.now().isoformat() + } + + return task + +# === SHUTDOWN PHASE === + +@on_shutdown +async def save_mcp_metrics(): + """Save comprehensive MCP metrics before shutdown.""" + global mcp_state + + logger.info("๐Ÿ“Š Saving MCP metrics and state...") + + # Calculate uptime + if mcp_state.metrics["boot_time"]: + uptime = datetime.now() - mcp_state.metrics["boot_time"] + mcp_state.metrics["uptime_seconds"] = uptime.total_seconds() + + # Compile final metrics + final_metrics = { + "session_metrics": mcp_state.metrics, + "server_status": mcp_state.servers, + "final_health_check": await generate_health_report() if mcp_state.connected else None, + "shutdown_timestamp": datetime.now().isoformat() + } + + # Save to file (in production, send to monitoring system) + metrics_path = "/tmp/mcp_session_metrics.json" + try: + with open(metrics_path, 'w') as f: + json.dump(final_metrics, f, indent=2, default=str) + logger.info(f"๐Ÿ“ˆ Metrics saved to {metrics_path}") + except Exception as e: + logger.error(f"โŒ Failed to save metrics: {e}") + +@on_shutdown +async def graceful_mcp_shutdown(): + """Gracefully shutdown all MCP connections.""" + global mcp_state + + logger.info("๐Ÿ”Œ Shutting down MCP connections...") + + if mcp_state.tools: + try: + # Disconnect from all servers gracefully + for server_name in mcp_state.servers: + if mcp_state.servers[server_name]["status"] == "connected": + logger.info(f"๐Ÿ”Œ Disconnecting from {server_name}...") + await mcp_state.tools.disconnect(server_name) + mcp_state.servers[server_name]["status"] = "disconnected" + + logger.info("โœ… All MCP connections closed gracefully") + + except Exception as e: + logger.error(f"โŒ Error during MCP shutdown: {e}") + + # Reset state + mcp_state.connected = False + mcp_state.tools = None + +@on_shutdown +def cleanup_resources(): + """Final cleanup of resources and temporary files.""" + logger.info("๐Ÿงน Performing final resource cleanup...") + + # Clear sensitive data from memory + if mcp_state.servers: + for server_info in mcp_state.servers.values(): + if "auth_token" in server_info: + server_info["auth_token"] = "[REDACTED]" + + # Clean up temporary files + temp_files = ["/tmp/mcp_temp_*", "/tmp/mcp_cache_*"] + for pattern in temp_files: + import glob + for file_path in glob.glob(pattern): + try: + os.remove(file_path) + logger.info(f"๐Ÿ—‘๏ธ Cleaned up {file_path}") + except Exception as e: + logger.warning(f"โš ๏ธ Failed to clean {file_path}: {e}") + + logger.info("โœ… Resource cleanup completed") + +# === HELPER FUNCTIONS === + +def load_server_configuration() -> Dict[str, Dict[str, Any]]: + """Load MCP server configuration from various sources.""" + + # Default configuration + default_config = { + "filesystem": { + "command": "npx", + "args": ["@modelcontextprotocol/server-filesystem", "/tmp"], + "env": {} + }, + "github": { + "command": "npx", + "args": ["@modelcontextprotocol/server-github"], + "env": {"GITHUB_TOKEN": os.getenv("GITHUB_TOKEN", "")} + } + } + + # Try to load from config file + config_path = os.path.expanduser(os.getenv("MCP_CONFIG_PATH", "~/.mcp/config.json")) + if os.path.exists(config_path): + try: + with open(config_path, 'r') as f: + file_config = json.load(f) + default_config.update(file_config.get("servers", {})) + except Exception as e: + logger.warning(f"โš ๏ธ Failed to load config from {config_path}: {e}") + + # Filter out servers with missing required environment variables + filtered_config = {} + for name, config in default_config.items(): + if name == "github" and not os.getenv("GITHUB_TOKEN"): + logger.info(f"โญ๏ธ Skipping {name} server (missing GITHUB_TOKEN)") + continue + filtered_config[name] = config + + return filtered_config + +async def connect_servers_with_retry(servers_config: Dict[str, Dict[str, Any]], max_retries: int = 3) -> Dict[str, bool]: + """Connect to MCP servers with retry logic.""" + connection_results = {} + + for server_name, config in servers_config.items(): + logger.info(f"๐Ÿ”— Connecting to {server_name}...") + + for attempt in range(max_retries): + try: + await mcp_state.tools.connect(server_name) + connection_results[server_name] = True + logger.info(f"โœ… Connected to {server_name}") + break + + except Exception as e: + logger.warning(f"โš ๏ธ Connection attempt {attempt + 1}/{max_retries} failed for {server_name}: {e}") + if attempt < max_retries - 1: + await asyncio.sleep(2 ** attempt) # Exponential backoff + else: + connection_results[server_name] = False + logger.error(f"โŒ Failed to connect to {server_name} after {max_retries} attempts") + + return connection_results + +async def discover_server_capabilities(): + """Discover and cache capabilities for all connected servers.""" + for server_name, server_info in mcp_state.servers.items(): + if server_info["status"] == "connected": + try: + tools = await mcp_state.tools.list_tools(server_name) + resources = await mcp_state.tools.list_resources(server_name) if hasattr(mcp_state.tools, 'list_resources') else [] + + server_info["tools_available"] = len(tools) + server_info["resources_available"] = len(resources) + server_info["capabilities"] = { + "tools": [tool["name"] for tool in tools], + "resources": [resource["uri"] for resource in resources] if resources else [] + } + + logger.info(f"๐Ÿ” Discovered {len(tools)} tools for {server_name}") + + except Exception as e: + logger.warning(f"โš ๏ธ Failed to discover capabilities for {server_name}: {e}") + +def analyze_task_requirements(task_text: str) -> Dict[str, Any]: + """Analyze task to determine required MCP server and operations.""" + analysis = { + "text": task_text.lower(), + "required_servers": [], + "suggested_operations": [], + "priority": "normal" + } + + # Keyword-based analysis + if any(keyword in analysis["text"] for keyword in ["file", "directory", "read", "write"]): + analysis["required_servers"].append("filesystem") + analysis["suggested_operations"].extend(["read_file", "list_directory", "write_file"]) + + if any(keyword in analysis["text"] for keyword in ["github", "repository", "repo", "git"]): + analysis["required_servers"].append("github") + analysis["suggested_operations"].extend(["search_repositories", "get_file_contents"]) + + if any(keyword in analysis["text"] for keyword in ["search", "web", "internet"]): + analysis["required_servers"].append("brave") + analysis["suggested_operations"].append("web_search") + + # Determine priority + if any(keyword in analysis["text"] for keyword in ["urgent", "asap", "immediately"]): + analysis["priority"] = "high" + + return analysis + +async def route_task_to_server(task_analysis: Dict[str, Any]) -> Dict[str, Any]: + """Route task to appropriate MCP server based on analysis.""" + start_time = datetime.now() + + for server_name in task_analysis["required_servers"]: + if server_name in mcp_state.servers and mcp_state.servers[server_name]["status"] == "connected": + + # Get available tools for this server + available_tools = mcp_state.servers[server_name].get("capabilities", {}).get("tools", []) + + # Find matching operations + matching_ops = [op for op in task_analysis["suggested_operations"] if op in available_tools] + + if matching_ops: + # Execute the first matching operation (simplified routing) + operation = matching_ops[0] + + try: + # This would call the actual MCP tool - simplified for example + result = await execute_mcp_operation(server_name, operation, task_analysis) + + processing_time = (datetime.now() - start_time).total_seconds() * 1000 + + return { + "server_name": server_name, + "operation": operation, + "data": result, + "processing_time": processing_time + } + + except Exception as e: + logger.error(f"โŒ Operation {operation} failed on {server_name}: {e}") + continue + + # Fallback: return available capabilities + return { + "server_name": "system", + "operation": "list_capabilities", + "data": { + "available_servers": list(mcp_state.servers.keys()), + "server_status": {name: info["status"] for name, info in mcp_state.servers.items()} + }, + "processing_time": (datetime.now() - start_time).total_seconds() * 1000 + } + +async def execute_mcp_operation(server_name: str, operation: str, task_analysis: Dict[str, Any]) -> Any: + """Execute specific MCP operation - simplified implementation.""" + + # This is a simplified implementation - in practice, you'd parse the task + # and extract proper parameters for each operation + + if operation == "read_file": + # Extract file path from task text + file_path = "/tmp/example.txt" # Simplified + return await mcp_state.tools.call_tool(server_name, operation, {"path": file_path}) + + elif operation == "search_repositories": + # Extract search query + query = "MCP protocol" # Simplified + return await mcp_state.tools.call_tool(server_name, operation, {"query": query}) + + elif operation == "web_search": + # Extract search terms + query = task_analysis["text"] + return await mcp_state.tools.call_tool(server_name, operation, {"query": query}) + + else: + return {"message": f"Operation {operation} not implemented"} + +async def generate_health_report() -> Dict[str, Any]: + """Generate comprehensive health report for MCP integration.""" + health_report = { + "timestamp": datetime.now().isoformat(), + "overall_status": "healthy" if mcp_state.connected else "unhealthy", + "uptime": None, + "servers": {}, + "metrics": mcp_state.metrics.copy() + } + + # Calculate uptime + if mcp_state.metrics["boot_time"]: + uptime = datetime.now() - mcp_state.metrics["boot_time"] + health_report["uptime"] = uptime.total_seconds() + + # Server-specific health checks + for server_name, server_info in mcp_state.servers.items(): + try: + if server_info["status"] == "connected" and mcp_state.tools: + # Ping server + tools = await mcp_state.tools.list_tools(server_name) + health_report["servers"][server_name] = { + "status": "healthy", + "tools_count": len(tools), + "last_successful_ping": datetime.now().isoformat(), + "reconnection_attempts": server_info.get("reconnection_attempts", 0) + } + else: + health_report["servers"][server_name] = { + "status": "unhealthy", + "reason": "Not connected", + "reconnection_attempts": server_info.get("reconnection_attempts", 0) + } + + except Exception as e: + health_report["servers"][server_name] = { + "status": "unhealthy", + "error": str(e), + "last_error_time": datetime.now().isoformat() + } + + return health_report + +# === BACKGROUND SERVICES === + +async def health_monitoring_service(): + """Background service for continuous health monitoring.""" + logger.info("๐Ÿฉบ Health monitoring service started") + + while not mcp_state.shutdown_requested: + try: + if mcp_state.connected: + health_report = await generate_health_report() + + # Check for unhealthy servers + unhealthy_servers = [ + name for name, status in health_report["servers"].items() + if status.get("status") != "healthy" + ] + + if unhealthy_servers: + logger.warning(f"โš ๏ธ Unhealthy servers detected: {unhealthy_servers}") + # Trigger reconnection attempts + for server_name in unhealthy_servers: + await attempt_server_reconnection(server_name) + + await asyncio.sleep(60) # Check every minute + + except Exception as e: + logger.error(f"โŒ Health monitoring error: {e}") + await asyncio.sleep(60) + + logger.info("๐Ÿฉบ Health monitoring service stopped") + +async def metrics_collection_service(): + """Background service for metrics collection.""" + logger.info("๐Ÿ“Š Metrics collection service started") + + while not mcp_state.shutdown_requested: + try: + # Update metrics periodically + mcp_state.metrics["timestamp"] = datetime.now().isoformat() + + # Log periodic metrics + if mcp_state.metrics["tasks_processed"] % 100 == 0 and mcp_state.metrics["tasks_processed"] > 0: + logger.info(f"๐Ÿ“ˆ Processed {mcp_state.metrics['tasks_processed']} tasks, {mcp_state.metrics['errors_encountered']} errors") + + await asyncio.sleep(300) # Update every 5 minutes + + except Exception as e: + logger.error(f"โŒ Metrics collection error: {e}") + await asyncio.sleep(300) + + logger.info("๐Ÿ“Š Metrics collection service stopped") + +async def connection_maintenance_service(): + """Background service for connection maintenance.""" + logger.info("๐Ÿ”ง Connection maintenance service started") + + while not mcp_state.shutdown_requested: + try: + # Perform connection maintenance + if mcp_state.connected: + for server_name, server_info in mcp_state.servers.items(): + if server_info["status"] == "connected": + # Periodic ping to keep connection alive + try: + await mcp_state.tools.list_tools(server_name) + server_info["last_ping"] = datetime.now().isoformat() + except Exception as e: + logger.warning(f"โš ๏ธ Connection maintenance failed for {server_name}: {e}") + server_info["status"] = "unhealthy" + + await asyncio.sleep(600) # Maintenance every 10 minutes + + except Exception as e: + logger.error(f"โŒ Connection maintenance error: {e}") + await asyncio.sleep(600) + + logger.info("๐Ÿ”ง Connection maintenance service stopped") + +async def attempt_server_reconnection(server_name: str): + """Attempt to reconnect to a specific server.""" + server_info = mcp_state.servers.get(server_name) + if not server_info: + return + + server_info["reconnection_attempts"] = server_info.get("reconnection_attempts", 0) + 1 + mcp_state.metrics["reconnections"] += 1 + + logger.info(f"๐Ÿ”„ Attempting reconnection to {server_name} (attempt #{server_info['reconnection_attempts']})") + + try: + await mcp_state.tools.reconnect(server_name) + server_info["status"] = "connected" + server_info["last_ping"] = datetime.now().isoformat() + logger.info(f"โœ… Successfully reconnected to {server_name}") + + except Exception as e: + logger.error(f"โŒ Reconnection failed for {server_name}: {e}") + server_info["status"] = "failed" + +# Initialize backend and agent +backend = Backend() +agno_agent = Agent(**backend.get_args()) + +# The agent is now ready with advanced MCP lifecycle management +logger.info("๐Ÿš€ Agent ready with advanced MCP lifecycle management!") +logger.info("๐Ÿ“‹ Try: 'Check MCP health status'") +logger.info("๐Ÿ“‹ Try: 'Search for Python examples on GitHub'") +logger.info("๐Ÿ“‹ Try: 'Read the file /tmp/config.json'") + +# Example usage +if __name__ == "__main__": + try: + agno_agent.print_response(message="Check MCP health status and show available capabilities") + except KeyboardInterrupt: + logger.info("๐Ÿ”„ Graceful shutdown initiated...") +``` + +## Production Deployment Considerations + + + +```python +# Send metrics to monitoring system +import prometheus_client +from prometheus_client import Counter, Histogram, Gauge + +mcp_tasks_total = Counter('mcp_tasks_processed_total') +mcp_errors_total = Counter('mcp_errors_total') +mcp_connection_status = Gauge('mcp_server_connected') +``` + + + +```python +# Environment-based configuration +class MCPConfig: + @classmethod + def from_env(cls): + return cls( + servers=json.loads(os.getenv('MCP_SERVERS', '{}')), + timeouts=int(os.getenv('MCP_TIMEOUT', '30')), + retry_attempts=int(os.getenv('MCP_RETRY_ATTEMPTS', '3')) + ) +``` + + + +## Testing Your Lifecycle Implementation + +```python test_lifecycle.py +import pytest +import asyncio + +@pytest.mark.asyncio +async def test_mcp_initialization(): + """Test MCP initialization process.""" + # Simulate initialization + await initialize_mcp_tools() + assert mcp_state.connected == True + assert len(mcp_state.servers) > 0 + +@pytest.mark.asyncio +async def test_graceful_shutdown(): + """Test graceful shutdown process.""" + # Setup + await initialize_mcp_tools() + + # Trigger shutdown + await graceful_mcp_shutdown() + + # Verify cleanup + assert mcp_state.connected == False + assert mcp_state.tools == None +``` + +## Key Lifecycle Patterns + + + + +1. **Environment Validation First** - Always validate before connecting +2. **Graceful Degradation** - Continue with partial functionality if some servers fail +3. **Background Services** - Start monitoring and maintenance services +4. **Signal Handling** - Setup proper signal handlers for graceful shutdown + + + + + +1. **Health Monitoring** - Continuous monitoring of server health +2. **Connection Pooling** - Efficient connection management +3. **Circuit Breaking** - Fail fast for unhealthy servers +4. **Metrics Collection** - Comprehensive operational metrics + + + + + +1. **Signal Handling** - Respond to system shutdown signals +2. **State Preservation** - Save important state before shutdown +3. **Connection Cleanup** - Gracefully close all connections +4. **Resource Cleanup** - Clean up temporary files and sensitive data + + + + +## Best Practices Summary + +1. **Always use lifecycle decorators** for proper resource management +2. **Implement comprehensive health checking** with automated recovery +3. **Use background services** for monitoring and maintenance +4. **Handle errors gracefully** with fallback mechanisms +5. **Save operational metrics** for monitoring and debugging +6. **Clean up resources thoroughly** during shutdown +7. **Use environment-based configuration** for flexibility +8. **Implement proper signal handling** for graceful shutdowns + +## Next Steps + +- **Production Monitoring**: Integrate with your monitoring stack +- **Custom MCP Servers**: Build domain-specific MCP servers +- **Load Testing**: Test your implementation under load +- **Security Hardening**: Implement authentication and authorization + +## References + +- **XPander Issue**: #401 +- **SDK Version**: v2.0.32 +- **Related Examples**: [MCP Integration Guide](/Examples/11-mcp-integration-guide), [Basic Lifecycle Management](/Examples/10-lifecycle-management) \ No newline at end of file diff --git a/Examples/13-mcp-best-practices-testing.mdx b/Examples/13-mcp-best-practices-testing.mdx new file mode 100644 index 0000000..91a7a11 --- /dev/null +++ b/Examples/13-mcp-best-practices-testing.mdx @@ -0,0 +1,1171 @@ +--- +title: "MCP Best Practices & Testing Guide" +description: "Comprehensive best practices, testing strategies, and production guidelines for MCP integration" +icon: "clipboard-check" +--- + + + **Best Practices Guide Summary** + + - **Goal**: Production-ready MCP integration with comprehensive testing + - **SDK Version**: v2.0.32 or higher + - **Prerequisites**: Understanding of testing frameworks and monitoring + - **Reference**: XPander issue #401 + + +## Best Practices Overview + +This guide provides comprehensive best practices for implementing, testing, and maintaining MCP integrations in production environments. + +## Development Best Practices + + + +Environment-based configuration with validation and fallbacks + + + +Comprehensive error handling with retry logic and circuit breakers + + + +Detailed logging, metrics, and health checks for operational visibility + + + +Unit, integration, and end-to-end testing for reliable deployments + + + +## Configuration Best Practices + +### Environment-Based Configuration + +```python config_best_practices.py +import os +import json +from typing import Dict, Any, Optional +from dataclasses import dataclass, field +from pathlib import Path + +@dataclass +class MCPServerConfig: + """Configuration for a single MCP server.""" + command: str + args: list[str] = field(default_factory=list) + env: Dict[str, str] = field(default_factory=dict) + timeout: int = 30 + retry_attempts: int = 3 + health_check_interval: int = 60 + +@dataclass +class MCPConfig: + """Complete MCP configuration.""" + servers: Dict[str, MCPServerConfig] = field(default_factory=dict) + global_timeout: int = 30 + max_concurrent_connections: int = 10 + health_check_enabled: bool = True + metrics_enabled: bool = True + + @classmethod + def from_env(cls) -> 'MCPConfig': + """Load configuration from environment variables.""" + config = cls() + + # Load from environment variable + config_json = os.getenv('MCP_CONFIG_JSON') + if config_json: + data = json.loads(config_json) + return cls.from_dict(data) + + # Load from config file + config_path = os.getenv('MCP_CONFIG_PATH', '~/.mcp/config.json') + config_file = Path(config_path).expanduser() + + if config_file.exists(): + with open(config_file) as f: + data = json.load(f) + return cls.from_dict(data) + + # Default configuration + return cls.default_config() + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'MCPConfig': + """Create configuration from dictionary.""" + servers = {} + for name, server_data in data.get('servers', {}).items(): + servers[name] = MCPServerConfig(**server_data) + + return cls( + servers=servers, + global_timeout=data.get('global_timeout', 30), + max_concurrent_connections=data.get('max_concurrent_connections', 10), + health_check_enabled=data.get('health_check_enabled', True), + metrics_enabled=data.get('metrics_enabled', True) + ) + + @classmethod + def default_config(cls) -> 'MCPConfig': + """Create default configuration.""" + servers = {} + + # Add filesystem server if available + servers['filesystem'] = MCPServerConfig( + command='npx', + args=['@modelcontextprotocol/server-filesystem', '/tmp'], + env={} + ) + + # Add GitHub server if token available + github_token = os.getenv('GITHUB_TOKEN') + if github_token: + servers['github'] = MCPServerConfig( + command='npx', + args=['@modelcontextprotocol/server-github'], + env={'GITHUB_TOKEN': github_token} + ) + + return cls(servers=servers) + + def validate(self) -> list[str]: + """Validate configuration and return list of errors.""" + errors = [] + + if not self.servers: + errors.append("No MCP servers configured") + + for name, server in self.servers.items(): + if not server.command: + errors.append(f"Server {name}: command is required") + + if server.timeout <= 0: + errors.append(f"Server {name}: timeout must be positive") + + return errors + +# Usage example +def load_validated_config() -> MCPConfig: + """Load and validate MCP configuration.""" + config = MCPConfig.from_env() + errors = config.validate() + + if errors: + raise ValueError(f"Configuration validation failed: {errors}") + + return config +``` + +### Secure Configuration Management + +```python secure_config.py +import os +from cryptography.fernet import Fernet +import base64 + +class SecureMCPConfig: + """Secure configuration management for MCP.""" + + def __init__(self): + # Get encryption key from environment or generate + key = os.getenv('MCP_ENCRYPTION_KEY') + if key: + self.fernet = Fernet(key.encode()) + else: + self.fernet = Fernet(Fernet.generate_key()) + + def encrypt_sensitive_data(self, data: str) -> str: + """Encrypt sensitive configuration data.""" + return base64.urlsafe_b64encode( + self.fernet.encrypt(data.encode()) + ).decode() + + def decrypt_sensitive_data(self, encrypted_data: str) -> str: + """Decrypt sensitive configuration data.""" + return self.fernet.decrypt( + base64.urlsafe_b64decode(encrypted_data.encode()) + ).decode() + + def load_server_tokens(self) -> Dict[str, str]: + """Load encrypted server tokens.""" + tokens = {} + + # Load encrypted tokens from environment + for key, value in os.environ.items(): + if key.startswith('MCP_TOKEN_'): + server_name = key.replace('MCP_TOKEN_', '').lower() + try: + tokens[server_name] = self.decrypt_sensitive_data(value) + except Exception as e: + logger.warning(f"Failed to decrypt token for {server_name}: {e}") + + return tokens +``` + +## Error Handling & Resilience + +### Circuit Breaker Pattern + +```python circuit_breaker.py +import asyncio +import time +from enum import Enum +from typing import Callable, Any, Optional + +class CircuitState(Enum): + CLOSED = "closed" + OPEN = "open" + HALF_OPEN = "half_open" + +class CircuitBreaker: + """Circuit breaker for MCP server connections.""" + + def __init__( + self, + failure_threshold: int = 5, + recovery_timeout: int = 60, + expected_exception: type = Exception + ): + self.failure_threshold = failure_threshold + self.recovery_timeout = recovery_timeout + self.expected_exception = expected_exception + + self.failure_count = 0 + self.last_failure_time: Optional[float] = None + self.state = CircuitState.CLOSED + + async def call(self, func: Callable, *args, **kwargs) -> Any: + """Execute function with circuit breaker protection.""" + + if self.state == CircuitState.OPEN: + if self._should_attempt_reset(): + self.state = CircuitState.HALF_OPEN + else: + raise Exception("Circuit breaker is OPEN") + + try: + result = await func(*args, **kwargs) + self._on_success() + return result + + except self.expected_exception as e: + self._on_failure() + raise e + + def _should_attempt_reset(self) -> bool: + """Check if enough time has passed to attempt reset.""" + if self.last_failure_time is None: + return True + + return time.time() - self.last_failure_time >= self.recovery_timeout + + def _on_success(self): + """Handle successful execution.""" + self.failure_count = 0 + self.state = CircuitState.CLOSED + + def _on_failure(self): + """Handle failed execution.""" + self.failure_count += 1 + self.last_failure_time = time.time() + + if self.failure_count >= self.failure_threshold: + self.state = CircuitState.OPEN + +# Usage with MCP tools +class ResilientMCPTools: + """MCP tools wrapper with circuit breaker protection.""" + + def __init__(self, mcp_tools): + self.mcp_tools = mcp_tools + self.circuit_breakers = {} + + def get_circuit_breaker(self, server_name: str) -> CircuitBreaker: + """Get circuit breaker for specific server.""" + if server_name not in self.circuit_breakers: + self.circuit_breakers[server_name] = CircuitBreaker( + failure_threshold=3, + recovery_timeout=30 + ) + return self.circuit_breakers[server_name] + + async def call_tool_with_protection(self, server_name: str, tool_name: str, args: dict): + """Call MCP tool with circuit breaker protection.""" + circuit_breaker = self.get_circuit_breaker(server_name) + + return await circuit_breaker.call( + self.mcp_tools.call_tool, + server_name, + tool_name, + args + ) +``` + +### Retry Logic with Exponential Backoff + +```python retry_logic.py +import asyncio +import random +from typing import Callable, Any, Optional +import logging + +logger = logging.getLogger(__name__) + +async def retry_with_exponential_backoff( + func: Callable, + max_retries: int = 3, + base_delay: float = 1.0, + max_delay: float = 60.0, + backoff_multiplier: float = 2.0, + jitter: bool = True, + exceptions: tuple = (Exception,) +): + """Execute function with exponential backoff retry logic.""" + + for attempt in range(max_retries + 1): + try: + return await func() + + except exceptions as e: + if attempt == max_retries: + logger.error(f"Function failed after {max_retries + 1} attempts: {e}") + raise e + + # Calculate delay with exponential backoff + delay = base_delay * (backoff_multiplier ** attempt) + delay = min(delay, max_delay) + + # Add jitter to prevent thundering herd + if jitter: + delay = delay * (0.5 + random.random() * 0.5) + + logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s") + await asyncio.sleep(delay) + +# Usage example +async def connect_with_retry(mcp_tools, server_name: str): + """Connect to MCP server with retry logic.""" + + async def connect_func(): + return await mcp_tools.connect(server_name) + + return await retry_with_exponential_backoff( + connect_func, + max_retries=3, + base_delay=1.0, + exceptions=(ConnectionError, TimeoutError) + ) +``` + +## Monitoring & Observability + +### Comprehensive Logging + +```python mcp_logging.py +import logging +import json +import time +from contextlib import contextmanager +from typing import Dict, Any + +class MCPLogger: + """Structured logging for MCP operations.""" + + def __init__(self, name: str): + self.logger = logging.getLogger(name) + + # Configure structured logging + handler = logging.StreamHandler() + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) + + def log_operation(self, operation: str, server_name: str, **kwargs): + """Log MCP operation with structured data.""" + log_data = { + "operation": operation, + "server_name": server_name, + "timestamp": time.time(), + **kwargs + } + + self.logger.info(f"MCP Operation: {json.dumps(log_data)}") + + def log_error(self, operation: str, server_name: str, error: Exception, **kwargs): + """Log MCP error with structured data.""" + log_data = { + "operation": operation, + "server_name": server_name, + "error_type": type(error).__name__, + "error_message": str(error), + "timestamp": time.time(), + **kwargs + } + + self.logger.error(f"MCP Error: {json.dumps(log_data)}") + + @contextmanager + def operation_context(self, operation: str, server_name: str, **kwargs): + """Context manager for logging operation duration.""" + start_time = time.time() + + try: + self.log_operation(operation, server_name, status="started", **kwargs) + yield + + duration = time.time() - start_time + self.log_operation( + operation, server_name, + status="completed", + duration_ms=duration * 1000, + **kwargs + ) + + except Exception as e: + duration = time.time() - start_time + self.log_error( + operation, server_name, e, + status="failed", + duration_ms=duration * 1000, + **kwargs + ) + raise + +# Usage example +mcp_logger = MCPLogger("mcp_integration") + +async def logged_mcp_operation(mcp_tools, server_name: str, tool_name: str, args: dict): + """Execute MCP operation with comprehensive logging.""" + + with mcp_logger.operation_context("call_tool", server_name, tool_name=tool_name): + return await mcp_tools.call_tool(server_name, tool_name, args) +``` + +### Metrics Collection + +```python mcp_metrics.py +import time +from typing import Dict, Any, Optional +from dataclasses import dataclass, field +from collections import defaultdict, deque +import threading + +@dataclass +class MCPMetrics: + """Comprehensive MCP metrics collection.""" + + # Connection metrics + connection_attempts: int = 0 + successful_connections: int = 0 + failed_connections: int = 0 + active_connections: int = 0 + + # Operation metrics + total_operations: int = 0 + successful_operations: int = 0 + failed_operations: int = 0 + + # Performance metrics + average_response_time: float = 0.0 + response_times: deque = field(default_factory=lambda: deque(maxlen=1000)) + + # Server-specific metrics + server_metrics: Dict[str, Dict[str, Any]] = field(default_factory=dict) + + # Error tracking + error_counts: Dict[str, int] = field(default_factory=lambda: defaultdict(int)) + last_errors: Dict[str, str] = field(default_factory=dict) + + # Health metrics + last_health_check: Optional[float] = None + health_check_failures: int = 0 + + def __post_init__(self): + self._lock = threading.Lock() + + def record_connection_attempt(self, server_name: str, success: bool): + """Record connection attempt.""" + with self._lock: + self.connection_attempts += 1 + + if success: + self.successful_connections += 1 + self.active_connections += 1 + else: + self.failed_connections += 1 + + # Update server-specific metrics + if server_name not in self.server_metrics: + self.server_metrics[server_name] = { + "connection_attempts": 0, + "successful_connections": 0, + "operations": 0, + "errors": 0 + } + + self.server_metrics[server_name]["connection_attempts"] += 1 + if success: + self.server_metrics[server_name]["successful_connections"] += 1 + + def record_operation(self, server_name: str, duration: float, success: bool, error: Optional[str] = None): + """Record operation metrics.""" + with self._lock: + self.total_operations += 1 + + if success: + self.successful_operations += 1 + else: + self.failed_operations += 1 + if error: + self.error_counts[error] += 1 + self.last_errors[server_name] = error + + # Update response time metrics + self.response_times.append(duration) + if self.response_times: + self.average_response_time = sum(self.response_times) / len(self.response_times) + + # Update server-specific metrics + if server_name in self.server_metrics: + self.server_metrics[server_name]["operations"] += 1 + if not success: + self.server_metrics[server_name]["errors"] += 1 + + def record_health_check(self, success: bool): + """Record health check result.""" + with self._lock: + self.last_health_check = time.time() + if not success: + self.health_check_failures += 1 + + def get_summary(self) -> Dict[str, Any]: + """Get metrics summary.""" + with self._lock: + success_rate = ( + self.successful_operations / self.total_operations + if self.total_operations > 0 else 0 + ) + + connection_success_rate = ( + self.successful_connections / self.connection_attempts + if self.connection_attempts > 0 else 0 + ) + + return { + "timestamp": time.time(), + "connections": { + "attempts": self.connection_attempts, + "successful": self.successful_connections, + "failed": self.failed_connections, + "active": self.active_connections, + "success_rate": connection_success_rate + }, + "operations": { + "total": self.total_operations, + "successful": self.successful_operations, + "failed": self.failed_operations, + "success_rate": success_rate + }, + "performance": { + "average_response_time_ms": self.average_response_time * 1000, + "response_count": len(self.response_times) + }, + "health": { + "last_check": self.last_health_check, + "failures": self.health_check_failures + }, + "servers": dict(self.server_metrics), + "top_errors": dict(list(sorted( + self.error_counts.items(), + key=lambda x: x[1], + reverse=True + ))[:5]) + } + +# Global metrics instance +mcp_metrics = MCPMetrics() + +# Usage decorators +def track_mcp_operation(server_name: str): + """Decorator to track MCP operations.""" + def decorator(func): + async def wrapper(*args, **kwargs): + start_time = time.time() + try: + result = await func(*args, **kwargs) + duration = time.time() - start_time + mcp_metrics.record_operation(server_name, duration, True) + return result + except Exception as e: + duration = time.time() - start_time + mcp_metrics.record_operation(server_name, duration, False, str(e)) + raise + return wrapper + return decorator +``` + +## Testing Strategy + +### Unit Tests + +```python test_mcp_unit.py +import pytest +import asyncio +from unittest.mock import Mock, AsyncMock, patch +from xpander_sdk.tools import MultiMCPTools + +class TestMCPIntegration: + """Unit tests for MCP integration.""" + + @pytest.fixture + def mock_mcp_tools(self): + """Mock MCP tools for testing.""" + tools = Mock(spec=MultiMCPTools) + tools.connect = AsyncMock() + tools.disconnect = AsyncMock() + tools.list_tools = AsyncMock() + tools.call_tool = AsyncMock() + return tools + + @pytest.fixture + def sample_server_config(self): + """Sample server configuration for testing.""" + return { + "filesystem": { + "command": "npx", + "args": ["@modelcontextprotocol/server-filesystem", "/tmp"] + } + } + + @pytest.mark.asyncio + async def test_successful_connection(self, mock_mcp_tools, sample_server_config): + """Test successful MCP server connection.""" + mock_mcp_tools.connect.return_value = True + + # Test connection + await mock_mcp_tools.connect("filesystem") + + # Verify connection was called + mock_mcp_tools.connect.assert_called_once_with("filesystem") + + @pytest.mark.asyncio + async def test_connection_failure(self, mock_mcp_tools): + """Test MCP connection failure handling.""" + mock_mcp_tools.connect.side_effect = ConnectionError("Connection failed") + + with pytest.raises(ConnectionError): + await mock_mcp_tools.connect("filesystem") + + @pytest.mark.asyncio + async def test_tool_discovery(self, mock_mcp_tools): + """Test MCP tool discovery.""" + expected_tools = [ + {"name": "read_file", "description": "Read file contents"}, + {"name": "write_file", "description": "Write file contents"} + ] + mock_mcp_tools.list_tools.return_value = expected_tools + + tools = await mock_mcp_tools.list_tools("filesystem") + + assert tools == expected_tools + mock_mcp_tools.list_tools.assert_called_once_with("filesystem") + + @pytest.mark.asyncio + async def test_tool_execution(self, mock_mcp_tools): + """Test MCP tool execution.""" + expected_result = {"content": "file contents"} + mock_mcp_tools.call_tool.return_value = expected_result + + result = await mock_mcp_tools.call_tool( + "filesystem", + "read_file", + {"path": "/tmp/test.txt"} + ) + + assert result == expected_result + mock_mcp_tools.call_tool.assert_called_once_with( + "filesystem", "read_file", {"path": "/tmp/test.txt"} + ) + + def test_metrics_collection(self): + """Test metrics collection functionality.""" + metrics = MCPMetrics() + + # Record some operations + metrics.record_connection_attempt("filesystem", True) + metrics.record_operation("filesystem", 0.5, True) + metrics.record_operation("filesystem", 1.0, False, "timeout") + + summary = metrics.get_summary() + + assert summary["connections"]["attempts"] == 1 + assert summary["connections"]["successful"] == 1 + assert summary["operations"]["total"] == 2 + assert summary["operations"]["successful"] == 1 + assert summary["operations"]["failed"] == 1 + + def test_circuit_breaker_open(self): + """Test circuit breaker opens after failures.""" + circuit_breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=1) + + # Simulate failures + for _ in range(3): + try: + circuit_breaker._on_failure() + except: + pass + + assert circuit_breaker.state == CircuitState.OPEN + + def test_configuration_validation(self): + """Test configuration validation.""" + config = MCPConfig() + + # Empty configuration should have errors + errors = config.validate() + assert len(errors) > 0 + + # Valid configuration should pass + config.servers["test"] = MCPServerConfig(command="test", args=[]) + errors = config.validate() + assert len(errors) == 0 +``` + +### Integration Tests + +```python test_mcp_integration.py +import pytest +import asyncio +import tempfile +import json +from pathlib import Path + +class TestMCPIntegrationE2E: + """End-to-end integration tests for MCP.""" + + @pytest.fixture + async def real_mcp_tools(self): + """Real MCP tools instance for integration testing.""" + # Only run if MCP servers are available + pytest.importorskip("mcp") + + config = { + "filesystem": { + "command": "npx", + "args": ["@modelcontextprotocol/server-filesystem", "/tmp"] + } + } + + tools = MultiMCPTools(servers=config) + + try: + await tools.connect_all() + yield tools + finally: + await tools.disconnect_all() + + @pytest.mark.integration + @pytest.mark.asyncio + async def test_filesystem_operations(self, real_mcp_tools): + """Test real filesystem operations.""" + # Create test file + test_file = "/tmp/mcp_test.txt" + test_content = "MCP integration test content" + + with open(test_file, 'w') as f: + f.write(test_content) + + try: + # Test file reading + result = await real_mcp_tools.call_tool( + "filesystem", + "read_file", + {"path": test_file} + ) + + assert result["content"] == test_content + + finally: + # Cleanup + Path(test_file).unlink(missing_ok=True) + + @pytest.mark.integration + @pytest.mark.asyncio + async def test_server_health_check(self, real_mcp_tools): + """Test server health checking.""" + # List available tools + tools = await real_mcp_tools.list_tools("filesystem") + + assert len(tools) > 0 + assert any(tool["name"] == "read_file" for tool in tools) + + @pytest.mark.integration + async def test_connection_recovery(self, real_mcp_tools): + """Test connection recovery after disconnection.""" + # Disconnect and reconnect + await real_mcp_tools.disconnect("filesystem") + await real_mcp_tools.connect("filesystem") + + # Verify connection works + tools = await real_mcp_tools.list_tools("filesystem") + assert len(tools) > 0 + +@pytest.mark.load +class TestMCPLoadTesting: + """Load testing for MCP integration.""" + + @pytest.mark.asyncio + async def test_concurrent_operations(self): + """Test concurrent MCP operations.""" + mock_tools = Mock(spec=MultiMCPTools) + mock_tools.call_tool = AsyncMock(return_value={"result": "success"}) + + # Simulate concurrent operations + tasks = [] + for i in range(100): + task = mock_tools.call_tool("filesystem", "read_file", {"path": f"/tmp/file_{i}.txt"}) + tasks.append(task) + + results = await asyncio.gather(*tasks) + + assert len(results) == 100 + assert all(r["result"] == "success" for r in results) + assert mock_tools.call_tool.call_count == 100 + + @pytest.mark.asyncio + async def test_connection_pool_limits(self): + """Test connection pool behavior under load.""" + # This would test actual connection pooling limits + # Implementation depends on your connection pooling strategy + pass +``` + +### Test Configuration + +```python conftest.py +import pytest +import asyncio +import os +from unittest.mock import patch + +def pytest_configure(config): + """Configure pytest with custom markers.""" + config.addinivalue_line("markers", "integration: integration tests requiring real MCP servers") + config.addinivalue_line("markers", "load: load testing scenarios") + +@pytest.fixture(scope="session") +def event_loop(): + """Create event loop for async tests.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + yield loop + loop.close() + +@pytest.fixture +def mock_environment(): + """Mock environment variables for testing.""" + env_vars = { + "XPANDER_API_KEY": "test-api-key", + "XPANDER_ORGANIZATION_ID": "test-org-id", + "GITHUB_TOKEN": "test-github-token" + } + + with patch.dict(os.environ, env_vars): + yield env_vars + +@pytest.fixture +def temp_mcp_config(): + """Create temporary MCP configuration file.""" + import tempfile + import json + + config = { + "servers": { + "filesystem": { + "command": "npx", + "args": ["@modelcontextprotocol/server-filesystem", "/tmp"] + } + }, + "global_timeout": 30 + } + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump(config, f) + f.flush() + + yield f.name + + os.unlink(f.name) +``` + +## Production Deployment Checklist + + + + +### Configuration Validation +- [ ] Environment variables are properly set +- [ ] MCP server configurations are validated +- [ ] Security tokens are encrypted +- [ ] Configuration schema is validated + +### Testing +- [ ] Unit tests pass +- [ ] Integration tests pass +- [ ] Load tests meet performance requirements +- [ ] Security tests validate token handling + +### Monitoring Setup +- [ ] Logging is configured +- [ ] Metrics collection is enabled +- [ ] Health checks are implemented +- [ ] Alerting is configured + + + + + +### Infrastructure +- [ ] MCP servers are installed and available +- [ ] Network connectivity is verified +- [ ] Resource limits are configured +- [ ] Backup and recovery procedures are in place + +### Application +- [ ] Application is deployed with proper configuration +- [ ] Health checks are passing +- [ ] Metrics are being collected +- [ ] Logs are being generated and stored + + + + + +### Monitoring +- [ ] Monitor connection success rates +- [ ] Track operation response times +- [ ] Watch for error patterns +- [ ] Monitor resource usage + +### Maintenance +- [ ] Regular health checks +- [ ] Log analysis and cleanup +- [ ] Performance optimization +- [ ] Security updates + + + + +## Performance Optimization + +### Connection Pooling + +```python connection_pool.py +import asyncio +from typing import Dict, Optional +from dataclasses import dataclass + +@dataclass +class ConnectionPool: + """Connection pool for MCP servers.""" + max_connections: int = 10 + min_connections: int = 2 + connection_timeout: int = 30 + + def __post_init__(self): + self.pools: Dict[str, asyncio.Queue] = {} + self.active_connections: Dict[str, int] = {} + + async def get_connection(self, server_name: str): + """Get connection from pool.""" + if server_name not in self.pools: + self.pools[server_name] = asyncio.Queue(maxsize=self.max_connections) + self.active_connections[server_name] = 0 + + pool = self.pools[server_name] + + try: + # Try to get existing connection + connection = pool.get_nowait() + return connection + except asyncio.QueueEmpty: + # Create new connection if under limit + if self.active_connections[server_name] < self.max_connections: + connection = await self._create_connection(server_name) + self.active_connections[server_name] += 1 + return connection + else: + # Wait for available connection + return await asyncio.wait_for(pool.get(), timeout=self.connection_timeout) + + async def return_connection(self, server_name: str, connection): + """Return connection to pool.""" + if server_name in self.pools: + try: + self.pools[server_name].put_nowait(connection) + except asyncio.QueueFull: + # Pool is full, close connection + await self._close_connection(connection) + self.active_connections[server_name] -= 1 + + async def _create_connection(self, server_name: str): + """Create new connection to MCP server.""" + # Implementation depends on MCP client library + pass + + async def _close_connection(self, connection): + """Close MCP connection.""" + # Implementation depends on MCP client library + pass +``` + +### Caching Strategy + +```python mcp_caching.py +import asyncio +import time +from typing import Any, Optional, Dict, Tuple +import hashlib +import json + +class MCPCache: + """Caching layer for MCP operations.""" + + def __init__(self, default_ttl: int = 300): + self.cache: Dict[str, Tuple[Any, float]] = {} + self.default_ttl = default_ttl + self._lock = asyncio.Lock() + + def _generate_cache_key(self, server_name: str, tool_name: str, args: dict) -> str: + """Generate cache key from operation parameters.""" + key_data = { + "server": server_name, + "tool": tool_name, + "args": args + } + key_str = json.dumps(key_data, sort_keys=True) + return hashlib.md5(key_str.encode()).hexdigest() + + async def get(self, server_name: str, tool_name: str, args: dict) -> Optional[Any]: + """Get cached result if available and not expired.""" + cache_key = self._generate_cache_key(server_name, tool_name, args) + + async with self._lock: + if cache_key in self.cache: + result, expiry_time = self.cache[cache_key] + + if time.time() < expiry_time: + return result + else: + # Remove expired entry + del self.cache[cache_key] + + return None + + async def set(self, server_name: str, tool_name: str, args: dict, result: Any, ttl: Optional[int] = None): + """Cache operation result.""" + cache_key = self._generate_cache_key(server_name, tool_name, args) + expiry_time = time.time() + (ttl or self.default_ttl) + + async with self._lock: + self.cache[cache_key] = (result, expiry_time) + + async def invalidate(self, server_name: str, tool_name: str, args: dict): + """Invalidate specific cache entry.""" + cache_key = self._generate_cache_key(server_name, tool_name, args) + + async with self._lock: + self.cache.pop(cache_key, None) + + async def clear_expired(self): + """Clear expired cache entries.""" + current_time = time.time() + + async with self._lock: + expired_keys = [ + key for key, (_, expiry_time) in self.cache.items() + if current_time >= expiry_time + ] + + for key in expired_keys: + del self.cache[key] + +# Usage with MCP tools +class CachedMCPTools: + """MCP tools wrapper with caching.""" + + def __init__(self, mcp_tools, cache_ttl: int = 300): + self.mcp_tools = mcp_tools + self.cache = MCPCache(default_ttl=cache_ttl) + + async def call_tool_cached(self, server_name: str, tool_name: str, args: dict, use_cache: bool = True): + """Call MCP tool with caching support.""" + + if use_cache: + # Try to get from cache first + cached_result = await self.cache.get(server_name, tool_name, args) + if cached_result is not None: + return cached_result + + # Call actual MCP tool + result = await self.mcp_tools.call_tool(server_name, tool_name, args) + + if use_cache: + # Cache the result + await self.cache.set(server_name, tool_name, args, result) + + return result +``` + +## Security Best Practices + + + + +1. **Token Management**: Store tokens securely using environment variables or secret management systems +2. **Token Rotation**: Implement automatic token rotation for long-lived tokens +3. **Access Control**: Limit MCP server access based on user roles and permissions +4. **Audit Logging**: Log all MCP operations for security auditing + + + + + +1. **TLS Encryption**: Ensure all MCP communication uses TLS +2. **Network Segmentation**: Isolate MCP servers in secure network segments +3. **Firewall Rules**: Configure strict firewall rules for MCP server access +4. **VPN/Private Networks**: Use VPNs for remote MCP server access + + + + + +1. **Data Encryption**: Encrypt sensitive data at rest and in transit +2. **Data Sanitization**: Sanitize data before passing to MCP servers +3. **Data Retention**: Implement proper data retention and deletion policies +4. **Backup Security**: Secure backup data with encryption and access controls + + + + +## Summary + +This comprehensive guide covers: + +1. **Configuration Management**: Environment-based, secure, and validated configuration +2. **Error Handling**: Circuit breakers, retry logic, and graceful degradation +3. **Monitoring**: Structured logging, metrics collection, and health checks +4. **Testing**: Unit, integration, and load testing strategies +5. **Performance**: Connection pooling, caching, and optimization techniques +6. **Security**: Authentication, authorization, and data protection + +Following these best practices will ensure your MCP integration is production-ready, maintainable, and secure. + +## References + +- **XPander Issue**: #401 +- **SDK Version**: v2.0.32 +- **Related Examples**: [MCP Integration Guide](/Examples/11-mcp-integration-guide), [Advanced Lifecycle Management](/Examples/12-advanced-lifecycle-management) \ No newline at end of file diff --git a/docs.json b/docs.json index 12bd72a..dc5884b 100644 --- a/docs.json +++ b/docs.json @@ -66,7 +66,11 @@ "Examples/06-async-task-handling", "Examples/07-session-management", "Examples/08-knowledge-base-rag", - "Examples/09-smart-support-agent" + "Examples/09-smart-support-agent", + "Examples/10-lifecycle-management", + "Examples/11-mcp-integration-guide", + "Examples/12-advanced-lifecycle-management", + "Examples/13-mcp-best-practices-testing" ] }, {