-
Notifications
You must be signed in to change notification settings - Fork 7
Description
Overview
Following the implementation of rate limiting and concurrency controls in PR #165, we should refactor the concurrency management components from mcp_tools/command_executor/ into the utils/ directory to make them reusable across the entire codebase.
Background
The current implementation includes several valuable concurrency management components that are currently tightly coupled to the CommandExecutor:
- Rate Limiter (
rate_limiter.py) - Token bucket and sliding window rate limiting - Concurrency Manager (
concurrency_manager.py) - Process queuing and concurrency controls - Resource Monitor (
resource_monitor.py) - Resource usage monitoring and enforcement - Enhanced Types (
types.py) - Configuration and response types for rate limiting/concurrency
These components implement generic patterns that could benefit other parts of the system.
Proposed Refactoring
1. Create New Utils Module Structure
utils/
├── concurrency_control/
│ ├── __init__.py
│ ├── rate_limiter.py # Token bucket & sliding window rate limiting
│ ├── concurrency_manager.py # Process queuing & concurrency controls
│ ├── resource_monitor.py # Resource usage monitoring
│ ├── types.py # Shared types and configurations
│ ├── py.typed # Type hints marker
│ └── tests/
│ ├── __init__.py
│ ├── test_rate_limiter.py
│ ├── test_concurrency_manager.py
│ ├── test_resource_monitor.py
│ └── test_integration.py
2. Decouple from CommandExecutor
Current Issues:
- Components are tightly coupled to CommandExecutor's process management
- Hard-coded assumptions about process tokens and PIDs
- Direct dependencies on CommandExecutor's internal state
Proposed Solutions:
- Abstract Process Interface: Create a generic
ProcessInterfacethat can be implemented by CommandExecutor or other process managers - Event-Based Architecture: Use callbacks/events instead of direct method calls
- Dependency Injection: Allow components to accept configurable backends
- Generic Resource Types: Support monitoring different resource types (not just processes)
3. Enhanced Reusability Features
Rate Limiter Enhancements
- Support for different resource types (API calls, file operations, network requests)
- Pluggable storage backends (memory, Redis, database)
- Distributed rate limiting capabilities
- Custom rate limiting algorithms
Concurrency Manager Enhancements
- Generic task queuing (not just process execution)
- Priority-based queuing
- Multiple queue types (FIFO, LIFO, Priority)
- Pluggable queue backends (memory, Redis, database)
Resource Monitor Enhancements
- Monitor different resource types (memory, CPU, disk, network)
- Custom resource limit policies
- Resource usage prediction and alerting
- Integration with system monitoring tools
4. Migration Strategy
Phase 1: Extract and Generalize (Week 1)
- Create utils/concurrency_control module structure
- Extract components with minimal changes
- Create abstract interfaces for process management
- Update imports in CommandExecutor
Phase 2: Decouple and Enhance (Week 2)
- Implement ProcessInterface abstraction
- Refactor components to use dependency injection
- Add configuration validation and error handling
- Create comprehensive test suite
Phase 3: Optimize and Extend (Week 3)
- Add pluggable backends support
- Implement distributed capabilities
- Add monitoring and metrics
- Performance optimization
5. Benefits
For CommandExecutor
- Cleaner Architecture: Separation of concerns between execution and concurrency control
- Easier Testing: Components can be tested independently
- Better Maintainability: Focused responsibilities
For the Codebase
- Reusability: Other components can use rate limiting and concurrency control
- Consistency: Standardized approach to concurrency across the system
- Extensibility: Easy to add new concurrency patterns
Potential Use Cases
- API Rate Limiting: Limit requests to external APIs
- File Processing: Control concurrent file operations
- Database Operations: Manage database connection pools
- Network Operations: Throttle network requests
- Background Jobs: Queue and manage background tasks
6. Implementation Details
Abstract Process Interface
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
class ProcessInterface(ABC):
"""Abstract interface for process management"""
@abstractmethod
async def get_process_info(self, process_id: str) -> Optional[Dict[str, Any]]:
"""Get information about a process"""
pass
@abstractmethod
async def terminate_process(self, process_id: str) -> bool:
"""Terminate a process"""
pass
@abstractmethod
async def is_process_running(self, process_id: str) -> bool:
"""Check if a process is still running"""
passConfiguration System
from pydantic import BaseModel
from typing import Optional, Dict, Any
class ConcurrencyConfig(BaseModel):
"""Configuration for concurrency control"""
# Rate limiting
rate_limit_enabled: bool = True
requests_per_minute: int = 60
burst_size: int = 10
# Concurrency control
max_concurrent: int = 5
max_per_user: int = 2
queue_size: int = 100
# Resource monitoring
memory_limit_mb: int = 1024
cpu_limit_percent: float = 80.0
execution_timeout: int = 300
# Backend configuration
backend_type: str = "memory" # memory, redis, database
backend_config: Dict[str, Any] = {}7. Testing Strategy
- Unit Tests: Test each component independently
- Integration Tests: Test component interactions
- Performance Tests: Verify rate limiting and concurrency behavior
- Stress Tests: Test under high load conditions
- Compatibility Tests: Ensure CommandExecutor continues to work
8. Documentation Requirements
- API Documentation: Complete API reference for all components
- Usage Examples: Common use cases and patterns
- Migration Guide: How to migrate from old to new architecture
- Configuration Reference: All configuration options and defaults
- Performance Guide: Tuning and optimization recommendations
Acceptance Criteria
- All concurrency components moved to
utils/concurrency_control/ - Components are decoupled from CommandExecutor specifics
- Abstract interfaces defined for extensibility
- Comprehensive test suite with >95% coverage
- CommandExecutor continues to work with new architecture
- All existing tests pass
- Documentation updated
- Performance benchmarks show no regression
Priority
Medium-High - This refactoring will improve code organization and enable reuse of valuable concurrency patterns across the codebase.
Labels
enhancementrefactoringarchitectureconcurrencyutils
Related Issues
- Closes: N/A (new architectural improvement)
- Related: [HIGH] Add Rate Limiting and Process Concurrency Controls #141 (original rate limiting implementation)
- Blocks: Future concurrency-related features
Note: This refactoring should be done after PR #165 is merged to avoid conflicts with the initial implementation.