From 64a95e219b2cd1b2e32a420701be26d68b1991de Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 15 Sep 2025 00:35:53 +0000 Subject: [PATCH 1/2] Initial plan From b2fe0c4280efc9f5c6a869487ae5af8242afa927 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 15 Sep 2025 00:52:31 +0000 Subject: [PATCH 2/2] Complete JsonAI workflow automation and API testing enhancements Co-authored-by: kishoretvk <13399112+kishoretvk@users.noreply.github.com> --- enhanced_features_guide.md | 822 +++++++++++++++++++++++ examples/example_postman_collection.json | 69 ++ examples/example_workflow.json | 54 ++ examples/user_schema.json | 52 ++ examples/workflow_examples.py | 604 +++++++++++++++++ gap_analysis_report.md | 317 +++++++++ jsonAI/api_collection_tester.py | 614 +++++++++++++++++ jsonAI/cli.py | 220 +++++- jsonAI/enhanced_cli.py | 544 +++++++++++++++ jsonAI/enhanced_workflow_engine.py | 817 ++++++++++++++++++++++ tests/test_enhanced_features.py | 495 ++++++++++++++ 11 files changed, 4582 insertions(+), 26 deletions(-) create mode 100644 enhanced_features_guide.md create mode 100644 examples/example_postman_collection.json create mode 100644 examples/example_workflow.json create mode 100644 examples/user_schema.json create mode 100644 examples/workflow_examples.py create mode 100644 gap_analysis_report.md create mode 100644 jsonAI/api_collection_tester.py create mode 100644 jsonAI/enhanced_cli.py create mode 100644 jsonAI/enhanced_workflow_engine.py create mode 100644 tests/test_enhanced_features.py diff --git a/enhanced_features_guide.md b/enhanced_features_guide.md new file mode 100644 index 0000000..66017e4 --- /dev/null +++ b/enhanced_features_guide.md @@ -0,0 +1,822 @@ +# JsonAI Enhanced Features Installation and Usage Guide + +## Overview + +This guide covers the new enhanced features added to JsonAI for workflow automation and API collection testing. These enhancements transform JsonAI from a simple JSON generation library into a comprehensive automation and testing platform. + +## Installation + +### Prerequisites + +- Python 3.9 or higher +- pip or poetry for package management + +### Standard Installation + +```bash +pip install jsonai +``` + +### Development Installation + +```bash +git clone https://github.com/kishoretvk/jsonAI.git +cd jsonAI +pip install -e . +``` + +### Required Dependencies for Enhanced Features + +```bash +pip install aiohttp pydantic click PyYAML +``` + +## New Features Overview + +### 1. Enhanced Workflow Automation Engine + +- **YAML/JSON workflow definitions** +- **Conditional branching and loops** +- **Parallel execution support** +- **Error handling and retry mechanisms** +- **Workflow scheduling** +- **Progress monitoring and reporting** + +### 2. API Collection Testing Framework + +- **Postman collection import/export** +- **OpenAPI specification testing** +- **Automated test data generation** +- **Response validation** +- **Performance metrics collection** +- **Comprehensive test reporting** + +### 3. Enhanced CLI Interface + +- **Workflow management commands** +- **API testing commands** +- **Data pipeline operations** +- **Integration management** +- **Project initialization utilities** + +## Quick Start Examples + +### 1. Basic Workflow Creation + +Create a workflow file `example_workflow.yaml`: + +```yaml +name: "api_testing_workflow" +version: "1.0" +description: "Simple API testing workflow" +variables: + api_base_url: "https://jsonplaceholder.typicode.com" + +steps: + - id: "generate_user_data" + name: "Generate Test User Data" + type: "json_generation" + config: + schema: + type: "object" + properties: + name: {type: "string"} + email: {type: "string", format: "email"} + age: {type: "integer", minimum: 18} + prompt: "Generate a realistic user profile" + output_variable: "user_data" + + - id: "send_api_request" + name: "Test API Endpoint" + type: "api_request" + depends_on: ["generate_user_data"] + config: + method: "POST" + url: "${api_base_url}/users" + headers: + Content-Type: "application/json" + body: "${user_data}" + expected_status: 201 +``` + +Run the workflow: + +```bash +# Create workflow +jsonai workflow create example_workflow.yaml + +# Run workflow +jsonai workflow run api_testing_workflow + +# Check status +jsonai workflow list +``` + +### 2. API Collection Testing + +Import and test a Postman collection: + +```bash +# Import Postman collection +jsonai test import-collection --source postman my_api_collection.json + +# Run tests +jsonai test run-collection --environment dev + +# Generate HTML report +jsonai test report --format html --output test_report.html +``` + +### 3. Advanced Data Generation + +Generate multiple test data items: + +```bash +# Generate 100 user records +jsonai generate --schema user_schema.json --prompt "Generate user data" --count 100 --output users.json + +# Generate in different formats +jsonai generate --schema user_schema.json --prompt "Generate user data" --output-format yaml --output users.yaml +``` + +## Workflow Configuration Reference + +### Workflow Definition Structure + +```yaml +name: "workflow_name" # Required: Unique workflow name +version: "1.0" # Required: Version string +description: "Description" # Optional: Workflow description + +variables: # Optional: Global variables + key: "value" + +steps: # Required: List of workflow steps + - id: "step_id" # Required: Unique step identifier + name: "Step Name" # Required: Human-readable name + type: "step_type" # Required: Step type (see below) + depends_on: ["step1"] # Optional: Dependencies + parallel: true # Optional: Enable parallel execution + retry_count: 3 # Optional: Retry attempts + timeout: 300 # Optional: Timeout in seconds + on_failure: "continue" # Optional: stop|continue|retry + conditions: # Optional: Execution conditions + variable_equals: "var=value" + config: # Required: Step-specific configuration + # Configuration varies by step type + +error_handling: # Optional: Global error handling + retry_count: 3 + timeout: 300 + on_failure: "stop" + +schedule: # Optional: Workflow scheduling + type: "daily" # daily|interval|cron + time: "09:00" # For daily schedules + +notifications: # Optional: Notifications + on_success: + type: "slack" + webhook_url: "https://..." + on_failure: + type: "email" + to: "admin@company.com" +``` + +### Supported Step Types + +#### 1. JSON Generation (`json_generation`) + +```yaml +type: "json_generation" +config: + schema: # JSON schema for generation + type: "object" + properties: + name: {type: "string"} + prompt: "Generate test data" # Generation prompt + output_variable: "var_name" # Variable to store result + jsonformer_options: # Optional Jsonformer options + temperature: 0.7 + max_tokens: 150 +``` + +#### 2. API Request (`api_request`) + +```yaml +type: "api_request" +config: + method: "POST" # HTTP method + url: "https://api.example.com/endpoint" + headers: # Request headers + Content-Type: "application/json" + Authorization: "Bearer ${token}" + body: "${data_variable}" # Request body (supports variables) + expected_status: 200 # Expected HTTP status code + output_variable: "response" # Variable to store response +``` + +#### 3. API Test Collection (`api_test_collection`) + +```yaml +type: "api_test_collection" +config: + collection_path: "./collection.json" # Path to collection file + collection_type: "postman" # postman|openapi + environment: "staging" # Environment name + output_variable: "test_results" # Variable to store results +``` + +#### 4. Data Validation (`data_validation`) + +```yaml +type: "data_validation" +config: + data_variable: "data_to_validate" # Variable containing data + schema: # Validation schema + type: "object" + required: ["name", "email"] + fail_on_invalid: true # Fail workflow if invalid +``` + +#### 5. Data Transformation (`data_transformation`) + +```yaml +type: "data_transformation" +config: + input_variable: "input_data" # Input data variable + transformation: # Transformation configuration + type: "filter" # filter|map|aggregate + condition: # For filter operations + type: "equals" + field: "status" + value: "active" + output_variable: "filtered_data" # Output variable +``` + +#### 6. Conditional (`conditional`) + +```yaml +type: "conditional" +config: + condition: # Condition to evaluate + type: "equals" # equals|greater_than|contains + field: "status" # Field to check + value: "success" # Expected value +``` + +#### 7. Loop (`loop`) + +```yaml +type: "loop" +config: + loop_variable: "items" # Variable containing array + steps: # Steps to execute for each item + - type: "api_request" + config: + method: "POST" + url: "https://api.example.com/process" + body: "${_current_item}" # Current loop item +``` + +#### 8. Parallel (`parallel`) + +```yaml +type: "parallel" +config: + steps: # Steps to execute in parallel + - type: "api_request" + config: {...} + - type: "json_generation" + config: {...} +``` + +#### 9. Delay (`delay`) + +```yaml +type: "delay" +config: + seconds: 30 # Delay duration in seconds +``` + +#### 10. Script (`script`) + +```yaml +type: "script" +config: + language: "python" # Script language + script: | # Script content + result = {"processed": True} + print(f"Processing complete") +``` + +#### 11. Notification (`notification`) + +```yaml +type: "notification" +config: + type: "slack" # email|slack|webhook + message: "Workflow ${workflow_name} completed" + webhook_url: "https://hooks.slack.com/..." # For webhook notifications +``` + +#### 12. Cleanup (`cleanup`) + +```yaml +type: "cleanup" +config: + variables: ["temp_data", "cache"] # Variables to remove +``` + +## API Collection Testing + +### Postman Collection Format + +JsonAI can import standard Postman collection files: + +```json +{ + "info": { + "name": "My API Collection", + "description": "Collection description" + }, + "variable": [ + {"key": "baseUrl", "value": "https://api.example.com"} + ], + "item": [ + { + "name": "Get Users", + "request": { + "method": "GET", + "url": { + "raw": "{{baseUrl}}/users", + "host": ["{{baseUrl}}"], + "path": ["users"] + }, + "header": [ + {"key": "Accept", "value": "application/json"} + ] + } + } + ] +} +``` + +### OpenAPI Specification Support + +JsonAI can also import OpenAPI 3.0 specifications: + +```yaml +openapi: 3.0.0 +info: + title: My API + version: 1.0.0 +servers: + - url: https://api.example.com +paths: + /users: + get: + operationId: getUsers + responses: + '200': + description: Success + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/User' + post: + operationId: createUser + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/UserInput' + responses: + '201': + description: Created +components: + schemas: + User: + type: object + properties: + id: {type: integer} + name: {type: string} + email: {type: string} + UserInput: + type: object + properties: + name: {type: string} + email: {type: string} + required: [name, email] +``` + +## CLI Command Reference + +### Workflow Commands + +```bash +# Create workflow from file +jsonai workflow create workflow.yaml + +# Run workflow +jsonai workflow run workflow_name + +# Run with variables +jsonai workflow run workflow_name --variables '{"env": "prod"}' +jsonai workflow run workflow_name --variables-file vars.json + +# List workflows +jsonai workflow list + +# Check execution status +jsonai workflow status + +# Schedule workflow +jsonai workflow schedule workflow_name --type daily --time "09:00" +``` + +### API Testing Commands + +```bash +# Import collections +jsonai test import-collection --source postman collection.json +jsonai test import-collection --source openapi api_spec.yaml + +# Run tests +jsonai test run-collection +jsonai test run-collection --environment staging + +# Generate reports +jsonai test report --format html --output report.html +jsonai test report --format json --output report.json +jsonai test report --format junit --output results.xml +``` + +### Data Generation Commands + +```bash +# Basic generation +jsonai generate --schema schema.json --prompt "Generate data" + +# Multiple items +jsonai generate --schema schema.json --prompt "Generate data" --count 100 + +# Different formats +jsonai generate --schema schema.json --prompt "Generate data" --output-format yaml + +# Output to file +jsonai generate --schema schema.json --prompt "Generate data" --output data.json + +# Use Ollama backend +jsonai generate --schema schema.json --prompt "Generate data" --use-ollama --ollama-model mistral:latest +``` + +### Data Validation Commands + +```bash +# Validate data against schema +jsonai data validate data.json schema.json + +# Transform data +jsonai data transform data.json transform_config.yaml --output transformed.json +``` + +### Utility Commands + +```bash +# Initialize new project +jsonai utils init my_project --type full + +# Validate workflow +jsonai utils validate-workflow workflow.yaml + +# Validate schema +jsonai generate-schema --description "User profile with name, email, and age" +``` + +### Server Commands + +```bash +# Start API server +jsonai server start --host 0.0.0.0 --port 8000 + +# Start with multiple workers +jsonai server start --workers 4 +``` + +## Integration Examples + +### CI/CD Integration + +#### GitHub Actions + +```yaml +name: API Testing Workflow +on: [push, pull_request] + +jobs: + api-tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Setup Python + uses: actions/setup-python@v2 + with: + python-version: '3.9' + + - name: Install JsonAI + run: pip install jsonai + + - name: Run API Tests + run: | + jsonai test import-collection --source postman tests/api_collection.json + jsonai test run-collection --environment staging + + - name: Generate Test Report + run: | + jsonai test report --format junit --output test-results.xml + + - name: Upload Test Results + uses: actions/upload-artifact@v2 + with: + name: test-results + path: test-results.xml +``` + +#### Jenkins Pipeline + +```groovy +pipeline { + agent any + + stages { + stage('API Testing') { + steps { + script { + sh 'pip install jsonai' + sh 'jsonai workflow run api_testing_workflow' + sh 'jsonai test report --format html --output test_report.html' + } + } + } + } + + post { + always { + publishHTML([ + allowMissing: false, + alwaysLinkToLastBuild: true, + keepAll: true, + reportDir: '.', + reportFiles: 'test_report.html', + reportName: 'API Test Report' + ]) + } + } +} +``` + +### Docker Integration + +```dockerfile +FROM python:3.9-slim + +WORKDIR /app + +# Install JsonAI +RUN pip install jsonai + +# Copy workflow and collection files +COPY workflows/ ./workflows/ +COPY collections/ ./collections/ +COPY schemas/ ./schemas/ + +# Set environment variables +ENV PYTHONPATH=/app + +# Run workflow +CMD ["jsonai", "workflow", "run", "api_testing_workflow"] +``` + +### Docker Compose + +```yaml +version: '3.8' + +services: + jsonai-server: + image: jsonai:latest + ports: + - "8000:8000" + environment: + - JSONAI_MODEL_BACKEND=ollama + - OLLAMA_HOST=http://ollama:11434 + command: jsonai server start --host 0.0.0.0 --port 8000 + + ollama: + image: ollama/ollama:latest + ports: + - "11434:11434" + volumes: + - ollama_data:/root/.ollama + + workflow-runner: + image: jsonai:latest + volumes: + - ./workflows:/app/workflows + - ./collections:/app/collections + environment: + - JSONAI_API_URL=http://jsonai-server:8000 + command: jsonai workflow run scheduled_tests + depends_on: + - jsonai-server + +volumes: + ollama_data: +``` + +## Best Practices + +### 1. Workflow Design + +- **Modular Steps**: Keep steps small and focused on single tasks +- **Error Handling**: Always define error handling strategies +- **Variable Management**: Use meaningful variable names and proper scoping +- **Dependencies**: Clearly define step dependencies to ensure proper execution order + +### 2. API Testing + +- **Data Validation**: Always validate API responses against schemas +- **Environment Management**: Use environment-specific configurations +- **Test Data**: Generate realistic test data for better coverage +- **Reporting**: Generate comprehensive reports for stakeholder review + +### 3. Performance Optimization + +- **Parallel Execution**: Use parallel steps when operations are independent +- **Caching**: Leverage JsonAI's built-in caching for repeated operations +- **Batch Processing**: Use batch operations for multiple similar requests +- **Timeouts**: Set appropriate timeouts for long-running operations + +### 4. Security Considerations + +- **Sensitive Data**: Never hardcode secrets in workflow files +- **Environment Variables**: Use environment variables for sensitive configuration +- **Access Control**: Implement proper authentication for API testing +- **Data Sanitization**: Sanitize generated test data before using in production environments + +## Troubleshooting + +### Common Issues + +#### 1. Workflow Execution Failures + +```bash +# Check workflow logs +jsonai workflow status + +# Validate workflow configuration +jsonai utils validate-workflow workflow.yaml + +# Test individual steps +jsonai generate --schema step_schema.json --prompt "test" +``` + +#### 2. API Collection Import Issues + +```bash +# Verify collection format +jsonai test validate-collection collection.json + +# Check for missing dependencies +pip install aiohttp pydantic + +# Test with dummy backend +jsonai generate --schema test_schema.json --prompt "test" --model dummy +``` + +#### 3. Model Backend Issues + +```bash +# Test Ollama connection +curl http://localhost:11434/api/tags + +# Use dummy backend for testing +jsonai generate --schema schema.json --prompt "test" --model dummy + +# Check model availability +jsonai models list +``` + +### Performance Issues + +#### 1. Slow Generation + +- Use smaller, optimized models for testing +- Implement caching for repeated requests +- Use batch processing for multiple items +- Optimize JSON schemas to be more specific + +#### 2. Memory Issues + +- Reduce batch sizes for large datasets +- Implement cleanup steps in workflows +- Use streaming for large responses +- Monitor memory usage during execution + +### Debug Mode + +Enable debug mode for detailed logging: + +```bash +# Enable debug in CLI +jsonai generate --schema schema.json --prompt "test" --debug + +# Enable debug in workflow +jsonai workflow run workflow_name --variables '{"debug": true}' +``` + +## Advanced Features + +### Custom Step Types + +Create custom step executors: + +```python +from jsonAI.enhanced_workflow_engine import WorkflowEngine, WorkflowStepType + +class CustomWorkflowEngine(WorkflowEngine): + def __init__(self): + super().__init__() + self.step_executors[WorkflowStepType.CUSTOM] = self._execute_custom_step + + async def _execute_custom_step(self, step, execution): + # Custom step implementation + return {"result": "custom_step_completed"} +``` + +### Plugin Development + +Create JsonAI plugins: + +```python +from jsonAI.plugin_system import PluginRegistry + +class MyPlugin: + def __init__(self): + self.name = "my_plugin" + + def execute(self, config): + # Plugin implementation + return {"status": "success"} + +# Register plugin +registry = PluginRegistry() +registry.register_plugin(MyPlugin()) +``` + +### Custom Backends + +Implement custom model backends: + +```python +from jsonAI.model_backends import ModelBackend + +class CustomBackend(ModelBackend): + def __init__(self, config): + self.config = config + + def generate(self, prompt, schema): + # Custom generation logic + return {"generated": "data"} +``` + +## Support and Contributing + +### Getting Help + +- **Documentation**: [GitHub Repository](https://github.com/kishoretvk/jsonAI) +- **Issues**: Report bugs and feature requests on GitHub +- **Community**: Join discussions in GitHub Discussions + +### Contributing + +1. Fork the repository +2. Create a feature branch +3. Implement your changes +4. Add tests for new features +5. Submit a pull request + +### Development Setup + +```bash +git clone https://github.com/kishoretvk/jsonAI.git +cd jsonAI +pip install -e ".[dev]" +pytest tests/ +``` + +This enhanced version of JsonAI provides a comprehensive platform for workflow automation and API testing, making it suitable for enterprise use cases and complex automation scenarios. \ No newline at end of file diff --git a/examples/example_postman_collection.json b/examples/example_postman_collection.json new file mode 100644 index 0000000..e31c9c5 --- /dev/null +++ b/examples/example_postman_collection.json @@ -0,0 +1,69 @@ +{ + "info": { + "name": "JSONPlaceholder API Tests", + "description": "Test collection for JSONPlaceholder API" + }, + "variable": [ + { + "key": "baseUrl", + "value": "https://jsonplaceholder.typicode.com" + } + ], + "item": [ + { + "name": "Get All Users", + "request": { + "method": "GET", + "url": { + "raw": "{{baseUrl}}/users", + "host": ["{{baseUrl}}"], + "path": ["users"] + }, + "header": [ + { + "key": "Accept", + "value": "application/json" + } + ] + } + }, + { + "name": "Get User by ID", + "request": { + "method": "GET", + "url": { + "raw": "{{baseUrl}}/users/1", + "host": ["{{baseUrl}}"], + "path": ["users", "1"] + }, + "header": [ + { + "key": "Accept", + "value": "application/json" + } + ] + } + }, + { + "name": "Create User", + "request": { + "method": "POST", + "url": { + "raw": "{{baseUrl}}/users", + "host": ["{{baseUrl}}"], + "path": ["users"] + }, + "header": [ + { + "key": "Content-Type", + "value": "application/json" + } + ], + "body": { + "mode": "raw", + "raw": "{\"name\": \"Test User\", \"username\": \"testuser\", \"email\": \"test@example.com\"}" + } + } + } + ] +} \ No newline at end of file diff --git a/examples/example_workflow.json b/examples/example_workflow.json new file mode 100644 index 0000000..ece507a --- /dev/null +++ b/examples/example_workflow.json @@ -0,0 +1,54 @@ +{ + "name": "example_workflow", + "version": "1.0", + "description": "Example workflow for testing JsonAI enhancements", + "variables": { + "api_base_url": "https://jsonplaceholder.typicode.com", + "environment": "test" + }, + "steps": [ + { + "id": "generate_user_data", + "name": "Generate Test User Data", + "type": "json_generation", + "config": { + "schema": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "username": {"type": "string"}, + "email": {"type": "string", "format": "email"}, + "phone": {"type": "string"}, + "website": {"type": "string"} + }, + "required": ["name", "username", "email"] + }, + "prompt": "Generate a realistic user profile for API testing", + "output_variable": "test_user_data" + } + }, + { + "id": "validate_user_data", + "name": "Validate Generated User Data", + "type": "data_validation", + "depends_on": ["generate_user_data"], + "config": { + "data_variable": "test_user_data", + "schema": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "email": {"type": "string", "format": "email"} + }, + "required": ["name", "email"] + }, + "fail_on_invalid": false + } + } + ], + "error_handling": { + "retry_count": 2, + "timeout": 60, + "on_failure": "continue" + } +} \ No newline at end of file diff --git a/examples/user_schema.json b/examples/user_schema.json new file mode 100644 index 0000000..37033a1 --- /dev/null +++ b/examples/user_schema.json @@ -0,0 +1,52 @@ +{ + "type": "object", + "properties": { + "id": { + "type": "integer", + "minimum": 1 + }, + "name": { + "type": "string", + "minLength": 1 + }, + "username": { + "type": "string", + "minLength": 1 + }, + "email": { + "type": "string", + "format": "email" + }, + "address": { + "type": "object", + "properties": { + "street": {"type": "string"}, + "suite": {"type": "string"}, + "city": {"type": "string"}, + "zipcode": {"type": "string"}, + "geo": { + "type": "object", + "properties": { + "lat": {"type": "string"}, + "lng": {"type": "string"} + } + } + } + }, + "phone": { + "type": "string" + }, + "website": { + "type": "string" + }, + "company": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "catchPhrase": {"type": "string"}, + "bs": {"type": "string"} + } + } + }, + "required": ["name", "username", "email"] +} \ No newline at end of file diff --git a/examples/workflow_examples.py b/examples/workflow_examples.py new file mode 100644 index 0000000..b0453cd --- /dev/null +++ b/examples/workflow_examples.py @@ -0,0 +1,604 @@ +""" +Example workflow configurations for JsonAI + +This file contains comprehensive examples of workflow automation scenarios +including API testing, data generation, and integration workflows. +""" + +# Example 1: API Testing Workflow +api_testing_workflow = { + "name": "api_testing_workflow", + "version": "1.0", + "description": "Comprehensive API testing workflow with data generation", + "variables": { + "api_base_url": "https://jsonplaceholder.typicode.com", + "environment": "dev", + "max_retries": 3 + }, + "steps": [ + { + "id": "generate_user_data", + "name": "Generate Test User Data", + "type": "json_generation", + "config": { + "schema": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "username": {"type": "string"}, + "email": {"type": "string", "format": "email"}, + "phone": {"type": "string"}, + "website": {"type": "string", "format": "uri"}, + "address": { + "type": "object", + "properties": { + "street": {"type": "string"}, + "suite": {"type": "string"}, + "city": {"type": "string"}, + "zipcode": {"type": "string"}, + "geo": { + "type": "object", + "properties": { + "lat": {"type": "string"}, + "lng": {"type": "string"} + } + } + } + }, + "company": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "catchPhrase": {"type": "string"}, + "bs": {"type": "string"} + } + } + }, + "required": ["name", "username", "email"] + }, + "prompt": "Generate a realistic user profile for API testing", + "output_variable": "test_user_data" + } + }, + { + "id": "create_user_api_test", + "name": "Test Create User API", + "type": "api_request", + "depends_on": ["generate_user_data"], + "config": { + "method": "POST", + "url": "${api_base_url}/users", + "headers": { + "Content-Type": "application/json" + }, + "body": "${test_user_data}", + "expected_status": 201, + "output_variable": "create_user_response" + }, + "retry_count": 3, + "timeout": 30 + }, + { + "id": "validate_user_creation", + "name": "Validate User Creation Response", + "type": "data_validation", + "depends_on": ["create_user_api_test"], + "config": { + "data_variable": "create_user_response", + "schema": { + "type": "object", + "properties": { + "body": { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "email": {"type": "string", "format": "email"} + }, + "required": ["id", "name", "email"] + }, + "status_code": {"type": "integer", "enum": [201]} + } + }, + "fail_on_invalid": True + } + }, + { + "id": "get_user_api_test", + "name": "Test Get User API", + "type": "api_request", + "depends_on": ["validate_user_creation"], + "config": { + "method": "GET", + "url": "${api_base_url}/users/1", + "headers": { + "Accept": "application/json" + }, + "expected_status": 200, + "output_variable": "get_user_response" + } + }, + { + "id": "cleanup_test_data", + "name": "Cleanup Test Data", + "type": "cleanup", + "depends_on": ["get_user_api_test"], + "config": { + "variables": ["test_user_data", "create_user_response"] + }, + "on_failure": "always" + } + ], + "error_handling": { + "retry_count": 3, + "timeout": 300, + "on_failure": "continue" + }, + "notifications": { + "on_success": { + "type": "webhook", + "url": "https://hooks.slack.com/workflow/success" + }, + "on_failure": { + "type": "email", + "to": "dev-team@company.com", + "subject": "API Testing Workflow Failed" + } + } +} + +# Example 2: Data Pipeline Workflow +data_pipeline_workflow = { + "name": "data_pipeline_workflow", + "version": "1.0", + "description": "Data generation and transformation pipeline", + "variables": { + "batch_size": 100, + "output_format": "json" + }, + "steps": [ + { + "id": "generate_customer_data", + "name": "Generate Customer Data Batch", + "type": "json_generation", + "config": { + "schema": { + "type": "array", + "maxItems": 100, + "items": { + "type": "object", + "properties": { + "customer_id": {"type": "string", "format": "uuid"}, + "first_name": {"type": "string"}, + "last_name": {"type": "string"}, + "email": {"type": "string", "format": "email"}, + "age": {"type": "integer", "minimum": 18, "maximum": 100}, + "subscription_tier": { + "type": "string", + "enum": ["basic", "premium", "enterprise"] + }, + "created_at": {"type": "string", "format": "date-time"}, + "total_spent": {"type": "number", "minimum": 0} + }, + "required": ["customer_id", "first_name", "last_name", "email"] + } + }, + "prompt": "Generate ${batch_size} realistic customer records for a SaaS platform", + "output_variable": "customer_data" + }, + "parallel": True + }, + { + "id": "filter_premium_customers", + "name": "Filter Premium Customers", + "type": "data_transformation", + "depends_on": ["generate_customer_data"], + "config": { + "input_variable": "customer_data", + "transformation": { + "type": "filter", + "condition": { + "type": "equals", + "field": "subscription_tier", + "value": "premium" + } + }, + "output_variable": "premium_customers" + } + }, + { + "id": "validate_customer_data", + "name": "Validate Customer Data Quality", + "type": "data_validation", + "depends_on": ["generate_customer_data"], + "config": { + "data_variable": "customer_data", + "schema": { + "type": "array", + "items": { + "type": "object", + "required": ["customer_id", "email"] + } + }, + "fail_on_invalid": False + } + }, + { + "id": "send_to_api", + "name": "Send Data to API", + "type": "api_request", + "depends_on": ["validate_customer_data", "filter_premium_customers"], + "config": { + "method": "POST", + "url": "https://api.crm.com/customers/batch", + "headers": { + "Content-Type": "application/json", + "Authorization": "Bearer ${api_token}" + }, + "body": "${customer_data}", + "expected_status": 200 + }, + "conditions": { + "step_success": "validate_customer_data" + } + } + ] +} + +# Example 3: Load Testing Workflow +load_testing_workflow = { + "name": "load_testing_workflow", + "version": "1.0", + "description": "Load testing workflow with parallel requests", + "variables": { + "target_url": "https://api.example.com", + "concurrent_users": 50, + "requests_per_user": 10 + }, + "steps": [ + { + "id": "generate_test_data_batch", + "name": "Generate Test Data for Load Testing", + "type": "json_generation", + "config": { + "schema": { + "type": "array", + "maxItems": 500, + "items": { + "type": "object", + "properties": { + "request_id": {"type": "string", "format": "uuid"}, + "payload": { + "type": "object", + "properties": { + "action": {"type": "string", "enum": ["create", "read", "update"]}, + "data": {"type": "object"} + } + } + } + } + }, + "prompt": "Generate ${concurrent_users * requests_per_user} test requests for load testing", + "output_variable": "load_test_data" + } + }, + { + "id": "parallel_load_test", + "name": "Execute Parallel Load Test", + "type": "parallel", + "depends_on": ["generate_test_data_batch"], + "config": { + "steps": [ + { + "id": "load_test_batch_1", + "type": "api_request", + "config": { + "method": "POST", + "url": "${target_url}/test", + "body": "${load_test_data}", + "timeout": 60 + } + } + ] + }, + "parallel": True + } + ] +} + +# Example 4: CI/CD Integration Workflow +cicd_workflow = { + "name": "cicd_integration_workflow", + "version": "1.0", + "description": "CI/CD pipeline integration with API testing", + "variables": { + "git_branch": "main", + "environment": "staging", + "deploy_url": "https://staging.api.com" + }, + "steps": [ + { + "id": "wait_for_deployment", + "name": "Wait for Deployment", + "type": "delay", + "config": { + "seconds": 30 + } + }, + { + "id": "health_check", + "name": "API Health Check", + "type": "api_request", + "depends_on": ["wait_for_deployment"], + "config": { + "method": "GET", + "url": "${deploy_url}/health", + "expected_status": 200, + "output_variable": "health_response" + }, + "retry_count": 5, + "timeout": 10 + }, + { + "id": "run_smoke_tests", + "name": "Run Smoke Tests", + "type": "api_test_collection", + "depends_on": ["health_check"], + "config": { + "collection_path": "./collections/smoke_tests.json", + "collection_type": "postman", + "environment": "${environment}", + "output_variable": "smoke_test_results" + } + }, + { + "id": "validate_smoke_tests", + "name": "Validate Smoke Test Results", + "type": "conditional", + "depends_on": ["run_smoke_tests"], + "config": { + "condition": { + "type": "greater_than", + "field": "success_rate", + "value": 0.95 + } + } + }, + { + "id": "notify_success", + "name": "Notify Deployment Success", + "type": "notification", + "depends_on": ["validate_smoke_tests"], + "config": { + "type": "slack", + "message": "🚀 Deployment to ${environment} successful! All smoke tests passed.", + "channel": "#deployments" + }, + "conditions": { + "step_success": "validate_smoke_tests" + } + }, + { + "id": "notify_failure", + "name": "Notify Deployment Failure", + "type": "notification", + "depends_on": ["validate_smoke_tests"], + "config": { + "type": "slack", + "message": "❌ Deployment to ${environment} failed! Smoke tests did not pass.", + "channel": "#deployments" + }, + "conditions": { + "step_failure": "validate_smoke_tests" + } + } + ], + "schedule": { + "type": "cron", + "expression": "0 */2 * * *" # Every 2 hours + } +} + +# Example 5: E-commerce Testing Workflow +ecommerce_workflow = { + "name": "ecommerce_testing_workflow", + "version": "1.0", + "description": "End-to-end e-commerce API testing workflow", + "variables": { + "store_url": "https://api.store.com", + "admin_token": "admin_secret_token" + }, + "steps": [ + { + "id": "generate_product_data", + "name": "Generate Product Data", + "type": "json_generation", + "config": { + "schema": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "description": {"type": "string"}, + "price": {"type": "number", "minimum": 0.01}, + "category": {"type": "string", "enum": ["electronics", "clothing", "books"]}, + "inventory": {"type": "integer", "minimum": 0, "maximum": 1000}, + "sku": {"type": "string"}, + "tags": {"type": "array", "items": {"type": "string"}} + }, + "required": ["name", "price", "category", "sku"] + }, + "prompt": "Generate a realistic product for an e-commerce store", + "output_variable": "product_data" + } + }, + { + "id": "create_product", + "name": "Create Product via API", + "type": "api_request", + "depends_on": ["generate_product_data"], + "config": { + "method": "POST", + "url": "${store_url}/products", + "headers": { + "Authorization": "Bearer ${admin_token}", + "Content-Type": "application/json" + }, + "body": "${product_data}", + "expected_status": 201, + "output_variable": "created_product" + } + }, + { + "id": "generate_customer_data", + "name": "Generate Customer Data", + "type": "json_generation", + "depends_on": ["create_product"], + "config": { + "schema": { + "type": "object", + "properties": { + "email": {"type": "string", "format": "email"}, + "password": {"type": "string", "minLength": 8}, + "first_name": {"type": "string"}, + "last_name": {"type": "string"}, + "address": { + "type": "object", + "properties": { + "street": {"type": "string"}, + "city": {"type": "string"}, + "state": {"type": "string"}, + "zip": {"type": "string"}, + "country": {"type": "string"} + } + } + }, + "required": ["email", "password", "first_name", "last_name"] + }, + "prompt": "Generate customer registration data", + "output_variable": "customer_data" + } + }, + { + "id": "register_customer", + "name": "Register Customer", + "type": "api_request", + "depends_on": ["generate_customer_data"], + "config": { + "method": "POST", + "url": "${store_url}/customers/register", + "headers": {"Content-Type": "application/json"}, + "body": "${customer_data}", + "expected_status": 201, + "output_variable": "registered_customer" + } + }, + { + "id": "login_customer", + "name": "Login Customer", + "type": "api_request", + "depends_on": ["register_customer"], + "config": { + "method": "POST", + "url": "${store_url}/auth/login", + "headers": {"Content-Type": "application/json"}, + "body": { + "email": "${customer_data.email}", + "password": "${customer_data.password}" + }, + "expected_status": 200, + "output_variable": "login_response" + } + }, + { + "id": "add_to_cart", + "name": "Add Product to Cart", + "type": "api_request", + "depends_on": ["login_customer"], + "config": { + "method": "POST", + "url": "${store_url}/cart/items", + "headers": { + "Authorization": "Bearer ${login_response.body.token}", + "Content-Type": "application/json" + }, + "body": { + "product_id": "${created_product.body.id}", + "quantity": 1 + }, + "expected_status": 200, + "output_variable": "cart_response" + } + }, + { + "id": "checkout", + "name": "Checkout Process", + "type": "api_request", + "depends_on": ["add_to_cart"], + "config": { + "method": "POST", + "url": "${store_url}/orders/checkout", + "headers": { + "Authorization": "Bearer ${login_response.body.token}", + "Content-Type": "application/json" + }, + "body": { + "payment_method": "credit_card", + "shipping_address": "${customer_data.address}" + }, + "expected_status": 201, + "output_variable": "order_response" + } + }, + { + "id": "verify_order", + "name": "Verify Order Creation", + "type": "data_validation", + "depends_on": ["checkout"], + "config": { + "data_variable": "order_response", + "schema": { + "type": "object", + "properties": { + "body": { + "type": "object", + "properties": { + "order_id": {"type": "string"}, + "status": {"type": "string", "enum": ["pending", "confirmed"]}, + "total": {"type": "number", "minimum": 0} + }, + "required": ["order_id", "status", "total"] + } + } + } + } + }, + { + "id": "cleanup_test_order", + "name": "Cleanup Test Order", + "type": "api_request", + "depends_on": ["verify_order"], + "config": { + "method": "DELETE", + "url": "${store_url}/orders/${order_response.body.order_id}", + "headers": { + "Authorization": "Bearer ${admin_token}" + }, + "expected_status": 204 + }, + "on_failure": "continue" + } + ], + "error_handling": { + "retry_count": 2, + "timeout": 60 + } +} + +# Export all examples +WORKFLOW_EXAMPLES = { + "api_testing": api_testing_workflow, + "data_pipeline": data_pipeline_workflow, + "load_testing": load_testing_workflow, + "cicd_integration": cicd_workflow, + "ecommerce_testing": ecommerce_workflow +} \ No newline at end of file diff --git a/gap_analysis_report.md b/gap_analysis_report.md new file mode 100644 index 0000000..7a3ca07 --- /dev/null +++ b/gap_analysis_report.md @@ -0,0 +1,317 @@ +# JsonAI Gap Analysis Report: Workflow Automation and API Testing + +## Executive Summary + +The JsonAI repository is a comprehensive Python library for generating structured JSON data using Large Language Models (LLMs). While it has strong foundational capabilities for JSON generation, schema validation, and basic workflow orchestration, there are significant gaps that prevent it from being an effective tool for automating workflows and testing API collections. + +## Current Functionality Assessment + +### ✅ Strengths + +1. **Core JSON Generation Engine** + - Multiple LLM backends (Ollama, OpenAI, HuggingFace Transformers) + - Comprehensive JSON Schema support (primitives, arrays, objects, enums, oneOf) + - Multiple output formats (JSON, YAML, XML, CSV) + - Schema validation with jsonschema + +2. **API Infrastructure** + - FastAPI REST API with OpenAPI documentation + - Sync/async generation endpoints + - Batch processing capabilities + - Performance monitoring and caching + +3. **Advanced Features** + - Workflow orchestration framework + - Conversational agents + - Plugin system for extensibility + - Integration hub for external services + - Tool registry and execution + +4. **Performance & Scalability** + - Multi-level caching (LRU/TTL) + - Async operations support + - Batch processing with configurable concurrency + - Performance metrics and monitoring + +### ❌ Critical Gaps for Workflow Automation + +## 1. **Workflow Definition and Management** + +### Current State: +- Basic `WorkflowOrchestrator` exists but lacks: + - Visual workflow designer + - Conditional branching logic + - Loop and iteration support + - Error recovery mechanisms + - Workflow versioning and rollback + +### Gaps: +- No standardized workflow definition format (YAML/JSON) +- Missing workflow validation before execution +- No workflow dependency management +- Lack of parallel execution paths +- No workflow scheduling capabilities + +## 2. **API Testing and Collection Management** + +### Current State: +- REST API exists for JSON generation +- Basic schema validation +- No built-in API testing framework + +### Critical Missing Features: +- **API Collection Management**: No support for importing/exporting API collections (Postman, Insomnia, OpenAPI) +- **Request/Response Validation**: No automated testing of API endpoints +- **Test Data Generation**: Limited integration with API testing scenarios +- **Load Testing**: No performance testing capabilities for APIs +- **Test Reporting**: No comprehensive test result reporting +- **Mock Server**: No built-in mock server for API testing + +## 3. **Data Pipeline and ETL Capabilities** + +### Gaps: +- No data transformation pipelines +- Missing data source connectors (databases, files, APIs) +- No data validation beyond JSON schema +- Limited data export/import capabilities +- No data lineage tracking + +## 4. **Integration and Connectivity** + +### Current State: +- Basic integration hub with GitHub, Slack, webhooks +- Plugin system exists but limited documentation + +### Missing Integrations: +- **CI/CD Integration**: No Jenkins, GitHub Actions, GitLab CI support +- **Testing Frameworks**: No integration with pytest, unittest, Jest +- **API Management**: No integration with API gateways (Kong, AWS API Gateway) +- **Monitoring**: No APM tool integration (New Relic, DataDog) +- **Databases**: Limited database connectivity + +## 5. **Testing Framework Deficiencies** + +### Current Testing Issues: +- Tests fail due to missing dependencies +- No automated testing for workflow scenarios +- Missing integration tests for API collections +- No performance benchmarking tests +- No end-to-end testing framework + +## 6. **Documentation and Examples** + +### Gaps: +- Missing workflow automation examples +- No API testing collection examples +- Limited deployment documentation +- No troubleshooting guides +- Missing best practices documentation + +## Proposed Solutions and Enhancements + +## 1. **Enhanced Workflow Automation Framework** + +### Implementation Plan: +```python +# Enhanced Workflow Definition +workflow_config = { + "name": "api_testing_workflow", + "version": "1.0", + "steps": [ + { + "id": "generate_test_data", + "type": "json_generation", + "schema": "user_schema.json", + "prompt": "Generate test user data", + "parallel": True + }, + { + "id": "api_test", + "type": "api_request", + "method": "POST", + "url": "https://api.example.com/users", + "depends_on": ["generate_test_data"], + "validation": { + "status_code": 201, + "response_schema": "user_response_schema.json" + } + }, + { + "id": "cleanup", + "type": "cleanup", + "depends_on": ["api_test"], + "on_failure": "always" + } + ], + "error_handling": { + "retry_count": 3, + "timeout": 30, + "fallback": "generate_default_data" + } +} +``` + +### New Components Needed: +- `WorkflowDefinitionParser` +- `WorkflowValidator` +- `WorkflowScheduler` +- `WorkflowMonitor` +- `WorkflowReporter` + +## 2. **API Testing Collection Framework** + +### Implementation Plan: +```python +class APICollectionTester: + def __init__(self): + self.collections = {} + self.test_results = [] + + def import_postman_collection(self, collection_path): + """Import Postman collection for testing""" + pass + + def generate_test_data_for_collection(self, collection_name): + """Generate test data for all endpoints in collection""" + pass + + def run_collection_tests(self, collection_name, environment="dev"): + """Execute all tests in collection""" + pass + + def generate_test_report(self, format="html"): + """Generate comprehensive test report""" + pass +``` + +### Required Features: +- Postman/Insomnia collection import +- OpenAPI specification testing +- Automated test data generation +- Response validation +- Performance metrics collection +- Test result reporting + +## 3. **Enhanced Integration Capabilities** + +### CI/CD Integration: +```yaml +# .github/workflows/api-testing.yml +name: API Testing Workflow +on: [push, pull_request] +jobs: + api-tests: + runs-on: ubuntu-latest + steps: + - name: Run JsonAI API Tests + run: | + jsonai-cli test-collection \ + --collection postman_collection.json \ + --environment dev \ + --generate-data \ + --report junit +``` + +### Database Integration: +```python +class DataSourceConnector: + def connect_postgres(self, connection_string): + """Connect to PostgreSQL database""" + pass + + def generate_test_data_from_schema(self, table_name): + """Generate test data based on database schema""" + pass + + def validate_data_integrity(self, test_data): + """Validate generated data against database constraints""" + pass +``` + +## 4. **Monitoring and Observability** + +### Implementation Plan: +```python +class WorkflowMonitor: + def __init__(self): + self.metrics = {} + self.alerts = {} + + def track_workflow_execution(self, workflow_id): + """Track workflow execution metrics""" + pass + + def setup_alerts(self, conditions): + """Setup alerts for workflow failures""" + pass + + def generate_dashboard_data(self): + """Generate data for monitoring dashboard""" + pass +``` + +## 5. **CLI Enhancements for Automation** + +### Enhanced CLI Commands: +```bash +# Workflow management +jsonai workflow create --config workflow.yaml +jsonai workflow run --name api_testing_workflow --environment prod +jsonai workflow status --name api_testing_workflow +jsonai workflow logs --name api_testing_workflow --tail 100 + +# API testing +jsonai test import-collection --source postman --file collection.json +jsonai test run-collection --name user_api_tests --environment staging +jsonai test generate-data --schema openapi.yaml --count 100 +jsonai test report --format html --output test_report.html + +# Data pipeline +jsonai data generate --source database --table users --count 1000 +jsonai data validate --schema user_schema.json --data test_data.json +jsonai data export --format csv --output test_data.csv +``` + +## Implementation Priority Matrix + +| Feature | Impact | Effort | Priority | +|---------|--------|--------|----------| +| API Collection Testing | High | Medium | 🔴 Critical | +| Enhanced Workflow Engine | High | High | 🔴 Critical | +| CI/CD Integration | High | Low | 🟡 High | +| Monitoring Dashboard | Medium | Medium | 🟡 High | +| Database Connectors | Medium | High | 🟢 Medium | +| Load Testing | Medium | Medium | 🟢 Medium | +| Visual Workflow Designer | Low | High | 🔵 Low | + +## Recommended Next Steps + +### Phase 1: Foundation (Weeks 1-2) +1. Fix existing test infrastructure +2. Create API collection testing framework +3. Enhance workflow orchestration engine +4. Add basic monitoring capabilities + +### Phase 2: Integration (Weeks 3-4) +1. Implement CI/CD integrations +2. Add database connectors +3. Create comprehensive examples +4. Improve documentation + +### Phase 3: Advanced Features (Weeks 5-6) +1. Add load testing capabilities +2. Implement advanced monitoring +3. Create workflow templates +4. Add performance optimization + +### Phase 4: Polish (Weeks 7-8) +1. Create visual workflow designer +2. Enhance reporting capabilities +3. Add enterprise features +4. Performance tuning + +## Conclusion + +While JsonAI has a solid foundation for JSON generation and basic workflow orchestration, significant enhancements are needed to make it effective for workflow automation and API testing. The proposed solutions address the critical gaps and provide a clear roadmap for development. Priority should be given to API collection testing framework and enhanced workflow engine as these provide the most immediate value for automation use cases. + +The implementation of these enhancements would transform JsonAI from a JSON generation library into a comprehensive automation and testing platform suitable for enterprise use cases. \ No newline at end of file diff --git a/jsonAI/api_collection_tester.py b/jsonAI/api_collection_tester.py new file mode 100644 index 0000000..0388ed6 --- /dev/null +++ b/jsonAI/api_collection_tester.py @@ -0,0 +1,614 @@ +""" +API Collection Testing Framework for JsonAI + +This module provides comprehensive API testing capabilities including: +- Postman/Insomnia collection import +- Automated test data generation +- API endpoint testing +- Response validation +- Performance metrics +- Test reporting +""" + +import json +import time +import asyncio +import uuid +from typing import Dict, List, Any, Optional, Union +from dataclasses import dataclass, field +from datetime import datetime +import requests +import aiohttp +from pathlib import Path + +from .main import Jsonformer +from .model_backends import DummyBackend +from .schema_validator import SchemaValidator + + +@dataclass +class APITestRequest: + """Represents a single API test request""" + id: str + name: str + method: str + url: str + headers: Dict[str, str] = field(default_factory=dict) + body: Optional[Dict[str, Any]] = None + expected_status: int = 200 + response_schema: Optional[Dict[str, Any]] = None + test_data_schema: Optional[Dict[str, Any]] = None + timeout: int = 30 + + +@dataclass +class APITestResult: + """Represents the result of an API test""" + request_id: str + name: str + success: bool + status_code: int + response_time: float + response_body: Optional[Dict[str, Any]] = None + error_message: Optional[str] = None + validation_errors: List[str] = field(default_factory=list) + timestamp: datetime = field(default_factory=datetime.now) + + +@dataclass +class APICollection: + """Represents a collection of API tests""" + name: str + description: str + base_url: str + requests: List[APITestRequest] = field(default_factory=list) + variables: Dict[str, str] = field(default_factory=dict) + auth: Optional[Dict[str, str]] = None + + +class APICollectionTester: + """ + Main class for API collection testing with JsonAI integration + """ + + def __init__(self, model_backend=None): + self.collections: Dict[str, APICollection] = {} + self.test_results: List[APITestResult] = [] + self.model_backend = model_backend or DummyBackend() + self.schema_validator = SchemaValidator() + + def import_postman_collection(self, collection_path: str) -> str: + """ + Import a Postman collection file + + Args: + collection_path: Path to Postman collection JSON file + + Returns: + Collection ID for reference + """ + with open(collection_path, 'r') as f: + postman_data = json.load(f) + + collection = self._parse_postman_collection(postman_data) + collection_id = str(uuid.uuid4()) + self.collections[collection_id] = collection + + return collection_id + + def import_openapi_spec(self, spec_path: str) -> str: + """ + Import an OpenAPI specification file + + Args: + spec_path: Path to OpenAPI spec file (JSON or YAML) + + Returns: + Collection ID for reference + """ + with open(spec_path, 'r') as f: + if spec_path.endswith('.yaml') or spec_path.endswith('.yml'): + import yaml + openapi_data = yaml.safe_load(f) + else: + openapi_data = json.load(f) + + collection = self._parse_openapi_spec(openapi_data) + collection_id = str(uuid.uuid4()) + self.collections[collection_id] = collection + + return collection_id + + def generate_test_data_for_collection(self, collection_id: str, + prompt_template: str = None) -> Dict[str, Any]: + """ + Generate test data for all requests in a collection + + Args: + collection_id: ID of the collection + prompt_template: Template for generating prompts + + Returns: + Dictionary mapping request IDs to generated test data + """ + if collection_id not in self.collections: + raise ValueError(f"Collection {collection_id} not found") + + collection = self.collections[collection_id] + test_data = {} + + for request in collection.requests: + if request.test_data_schema: + prompt = prompt_template or f"Generate test data for {request.name} API endpoint" + + jsonformer = Jsonformer( + model_backend=self.model_backend, + json_schema=request.test_data_schema, + prompt=prompt + ) + + generated_data = jsonformer.generate_data() + test_data[request.id] = generated_data + + return test_data + + async def run_collection_tests(self, collection_id: str, + environment: str = "default", + parallel: bool = True, + max_concurrent: int = 5) -> List[APITestResult]: + """ + Execute all tests in a collection + + Args: + collection_id: ID of the collection to test + environment: Environment name for variable substitution + parallel: Whether to run tests in parallel + max_concurrent: Maximum concurrent requests when parallel=True + + Returns: + List of test results + """ + if collection_id not in self.collections: + raise ValueError(f"Collection {collection_id} not found") + + collection = self.collections[collection_id] + + # Generate test data for all requests + test_data = self.generate_test_data_for_collection(collection_id) + + if parallel: + semaphore = asyncio.Semaphore(max_concurrent) + tasks = [] + + for request in collection.requests: + task = self._run_single_test_async( + request, collection, test_data.get(request.id), semaphore + ) + tasks.append(task) + + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Filter out exceptions and convert to APITestResult + test_results = [] + for result in results: + if isinstance(result, APITestResult): + test_results.append(result) + elif isinstance(result, Exception): + # Create error result + test_results.append(APITestResult( + request_id="unknown", + name="Error", + success=False, + status_code=0, + response_time=0, + error_message=str(result) + )) + else: + test_results = [] + for request in collection.requests: + result = await self._run_single_test_async( + request, collection, test_data.get(request.id) + ) + test_results.append(result) + + self.test_results.extend(test_results) + return test_results + + async def _run_single_test_async(self, request: APITestRequest, + collection: APICollection, + test_data: Optional[Dict[str, Any]], + semaphore: Optional[asyncio.Semaphore] = None) -> APITestResult: + """Run a single API test asynchronously""" + if semaphore: + async with semaphore: + return await self._execute_request(request, collection, test_data) + else: + return await self._execute_request(request, collection, test_data) + + async def _execute_request(self, request: APITestRequest, + collection: APICollection, + test_data: Optional[Dict[str, Any]]) -> APITestResult: + """Execute a single API request and validate response""" + start_time = time.time() + + # Prepare URL with variable substitution + url = self._substitute_variables(request.url, collection.variables) + if not url.startswith('http'): + url = collection.base_url.rstrip('/') + '/' + url.lstrip('/') + + # Prepare headers + headers = request.headers.copy() + if collection.auth: + headers.update(self._prepare_auth_headers(collection.auth)) + + # Prepare body with test data + body = request.body + if test_data and request.method.upper() in ['POST', 'PUT', 'PATCH']: + body = test_data + + try: + async with aiohttp.ClientSession() as session: + async with session.request( + method=request.method, + url=url, + headers=headers, + json=body, + timeout=aiohttp.ClientTimeout(total=request.timeout) + ) as response: + response_time = time.time() - start_time + + try: + response_body = await response.json() + except: + response_body = await response.text() + + # Validate response + validation_errors = [] + success = response.status == request.expected_status + + if request.response_schema and isinstance(response_body, dict): + is_valid, errors = self.schema_validator.validate( + response_body, request.response_schema + ) + if not is_valid: + success = False + validation_errors.extend(errors) + + return APITestResult( + request_id=request.id, + name=request.name, + success=success, + status_code=response.status, + response_time=response_time, + response_body=response_body, + validation_errors=validation_errors + ) + + except Exception as e: + response_time = time.time() - start_time + return APITestResult( + request_id=request.id, + name=request.name, + success=False, + status_code=0, + response_time=response_time, + error_message=str(e) + ) + + def generate_test_report(self, format: str = "html", + output_file: Optional[str] = None) -> str: + """ + Generate a comprehensive test report + + Args: + format: Report format ('html', 'json', 'junit') + output_file: Optional output file path + + Returns: + Report content as string + """ + if format.lower() == "html": + report = self._generate_html_report() + elif format.lower() == "json": + report = self._generate_json_report() + elif format.lower() == "junit": + report = self._generate_junit_report() + else: + raise ValueError(f"Unsupported report format: {format}") + + if output_file: + with open(output_file, 'w') as f: + f.write(report) + + return report + + def _parse_postman_collection(self, postman_data: Dict[str, Any]) -> APICollection: + """Parse Postman collection format""" + info = postman_data.get('info', {}) + + collection = APICollection( + name=info.get('name', 'Untitled Collection'), + description=info.get('description', ''), + base_url='' + ) + + # Parse variables + if 'variable' in postman_data: + for var in postman_data['variable']: + collection.variables[var['key']] = var.get('value', '') + + # Parse items (requests) + items = postman_data.get('item', []) + for item in items: + request_data = item.get('request', {}) + if isinstance(request_data, dict): + request = self._parse_postman_request(item, request_data) + collection.requests.append(request) + + return collection + + def _parse_postman_request(self, item: Dict[str, Any], + request_data: Dict[str, Any]) -> APITestRequest: + """Parse a single Postman request""" + url_data = request_data.get('url', {}) + if isinstance(url_data, str): + url = url_data + else: + url = url_data.get('raw', '') + + headers = {} + if 'header' in request_data: + for header in request_data['header']: + if not header.get('disabled', False): + headers[header['key']] = header['value'] + + body = None + if 'body' in request_data: + body_data = request_data['body'] + if body_data.get('mode') == 'raw': + try: + body = json.loads(body_data.get('raw', '{}')) + except: + pass + + return APITestRequest( + id=str(uuid.uuid4()), + name=item.get('name', 'Untitled Request'), + method=request_data.get('method', 'GET'), + url=url, + headers=headers, + body=body + ) + + def _parse_openapi_spec(self, openapi_data: Dict[str, Any]) -> APICollection: + """Parse OpenAPI specification format""" + info = openapi_data.get('info', {}) + servers = openapi_data.get('servers', []) + base_url = servers[0]['url'] if servers else '' + + collection = APICollection( + name=info.get('title', 'OpenAPI Collection'), + description=info.get('description', ''), + base_url=base_url + ) + + paths = openapi_data.get('paths', {}) + for path, methods in paths.items(): + for method, operation in methods.items(): + if method.lower() in ['get', 'post', 'put', 'delete', 'patch']: + request = self._parse_openapi_operation(path, method, operation) + collection.requests.append(request) + + return collection + + def _parse_openapi_operation(self, path: str, method: str, + operation: Dict[str, Any]) -> APITestRequest: + """Parse a single OpenAPI operation""" + # Extract request body schema for test data generation + test_data_schema = None + request_body = operation.get('requestBody', {}) + if request_body: + content = request_body.get('content', {}) + json_content = content.get('application/json', {}) + test_data_schema = json_content.get('schema') + + # Extract response schema for validation + response_schema = None + responses = operation.get('responses', {}) + success_response = responses.get('200') or responses.get('201') + if success_response: + content = success_response.get('content', {}) + json_content = content.get('application/json', {}) + response_schema = json_content.get('schema') + + return APITestRequest( + id=str(uuid.uuid4()), + name=operation.get('operationId', f"{method.upper()} {path}"), + method=method.upper(), + url=path, + test_data_schema=test_data_schema, + response_schema=response_schema + ) + + def _substitute_variables(self, text: str, variables: Dict[str, str]) -> str: + """Substitute variables in text using {{variable}} format""" + for key, value in variables.items(): + text = text.replace(f"{{{{{key}}}}}", value) + return text + + def _prepare_auth_headers(self, auth: Dict[str, str]) -> Dict[str, str]: + """Prepare authentication headers""" + headers = {} + auth_type = auth.get('type', '') + + if auth_type == 'bearer': + token = auth.get('token', '') + headers['Authorization'] = f"Bearer {token}" + elif auth_type == 'basic': + import base64 + username = auth.get('username', '') + password = auth.get('password', '') + credentials = base64.b64encode(f"{username}:{password}".encode()).decode() + headers['Authorization'] = f"Basic {credentials}" + elif auth_type == 'apikey': + key = auth.get('key', '') + value = auth.get('value', '') + headers[key] = value + + return headers + + def _generate_html_report(self) -> str: + """Generate HTML test report""" + total_tests = len(self.test_results) + passed_tests = sum(1 for r in self.test_results if r.success) + failed_tests = total_tests - passed_tests + avg_response_time = sum(r.response_time for r in self.test_results) / total_tests if total_tests > 0 else 0 + + html = f""" + + + + JsonAI API Test Report + + + +

JsonAI API Test Report

+
+

Test Summary

+
+
+

{total_tests}

+

Total Tests

+
+
+

{passed_tests}

+

Passed

+
+
+

{failed_tests}

+

Failed

+
+
+

{avg_response_time:.2f}s

+

Avg Response Time

+
+
+
+ +

Test Results

+ """ + + for result in self.test_results: + status_class = "success" if result.success else "failure" + status_text = "PASS" if result.success else "FAIL" + + html += f""" +
+

{result.name} - {status_text}

+

Status Code: {result.status_code}

+

Response Time: {result.response_time:.2f}s

+

Timestamp: {result.timestamp}

+ """ + + if result.error_message: + html += f"

Error: {result.error_message}

" + + if result.validation_errors: + html += "

Validation Errors:

    " + for error in result.validation_errors: + html += f"
  • {error}
  • " + html += "
" + + html += "
" + + html += """ + + + """ + + return html + + def _generate_json_report(self) -> str: + """Generate JSON test report""" + report_data = { + "summary": { + "total_tests": len(self.test_results), + "passed_tests": sum(1 for r in self.test_results if r.success), + "failed_tests": sum(1 for r in self.test_results if not r.success), + "avg_response_time": sum(r.response_time for r in self.test_results) / len(self.test_results) if self.test_results else 0 + }, + "results": [ + { + "request_id": r.request_id, + "name": r.name, + "success": r.success, + "status_code": r.status_code, + "response_time": r.response_time, + "error_message": r.error_message, + "validation_errors": r.validation_errors, + "timestamp": r.timestamp.isoformat() + } for r in self.test_results + ] + } + + return json.dumps(report_data, indent=2) + + def _generate_junit_report(self) -> str: + """Generate JUnit XML test report""" + total_tests = len(self.test_results) + failed_tests = sum(1 for r in self.test_results if not r.success) + total_time = sum(r.response_time for r in self.test_results) + + xml = f""" + +""" + + for result in self.test_results: + xml += f""" +""" + if not result.success: + error_msg = result.error_message or "Test failed" + xml += f""" +Status Code: {result.status_code} +Validation Errors: {', '.join(result.validation_errors)} + +""" + xml += """ +""" + + xml += """""" + + return xml + + +# CLI Integration +class APITestingCLI: + """CLI interface for API collection testing""" + + def __init__(self): + self.tester = APICollectionTester() + + def import_collection_command(self, source: str, file_path: str) -> str: + """CLI command to import API collection""" + if source.lower() == "postman": + return self.tester.import_postman_collection(file_path) + elif source.lower() == "openapi": + return self.tester.import_openapi_spec(file_path) + else: + raise ValueError(f"Unsupported collection source: {source}") + + async def run_tests_command(self, collection_id: str, environment: str = "default") -> List[APITestResult]: + """CLI command to run API tests""" + return await self.tester.run_collection_tests(collection_id, environment) + + def generate_report_command(self, format: str = "html", output: str = None) -> str: + """CLI command to generate test report""" + return self.tester.generate_test_report(format, output) \ No newline at end of file diff --git a/jsonAI/cli.py b/jsonAI/cli.py index 42857ca..be7260c 100644 --- a/jsonAI/cli.py +++ b/jsonAI/cli.py @@ -1,23 +1,46 @@ import click import json +import yaml +import asyncio +from pathlib import Path from jsonAI.main import Jsonformer, AsyncJsonformer -from jsonAI.model_backends import TransformersBackend, OllamaBackend +from jsonAI.model_backends import TransformersBackend, OllamaBackend, DummyBackend from jsonAI.schema_generator import SchemaGenerator from transformers import AutoModelForCausalLM, AutoTokenizer -import asyncio + +# Import enhanced features +try: + from .enhanced_workflow_engine import WorkflowCLI + from .api_collection_tester import APITestingCLI + ENHANCED_FEATURES = True +except ImportError: + ENHANCED_FEATURES = False + click.echo("Enhanced features not available. Some commands may be disabled.") @click.group() +@click.version_option() def cli(): + """JsonAI - Advanced JSON generation and workflow automation tool""" pass def initialize_backend(use_ollama, model, ollama_model): """Initialize the backend based on user options.""" if use_ollama: - return OllamaBackend(model_name=ollama_model) + try: + return OllamaBackend(model_name=ollama_model) + except Exception as e: + print(f"Warning: Could not initialize Ollama backend: {e}") + print("Falling back to DummyBackend for testing") + return DummyBackend() else: - tokenizer = AutoTokenizer.from_pretrained(model) - model = AutoModelForCausalLM.from_pretrained(model) - return TransformersBackend(model, tokenizer) + try: + tokenizer = AutoTokenizer.from_pretrained(model) + model = AutoModelForCausalLM.from_pretrained(model) + return TransformersBackend(model, tokenizer) + except Exception as e: + print(f"Warning: Could not initialize Transformers backend: {e}") + print("Falling back to DummyBackend for testing") + return DummyBackend() @cli.command() @click.option("--schema", type=click.File('r'), required=True, help="JSON schema file") @@ -27,7 +50,9 @@ def initialize_backend(use_ollama, model, ollama_model): @click.option("--ollama-model", default="qwen3:0.6b", help="Ollama model name") @click.option("--output-format", default="json", help="Output format (json, yaml, xml, csv)") @click.option("--async", "use_async", is_flag=True, help="Use async generation") -def generate(schema, prompt, model, use_ollama, ollama_model, output_format, use_async): +@click.option("--count", default=1, help="Number of items to generate") +@click.option("--output", type=click.Path(), help="Output file path") +def generate(schema, prompt, model, use_ollama, ollama_model, output_format, use_async, count, output): """Generate structured data from a schema and prompt""" try: json_schema = json.load(schema) @@ -37,31 +62,50 @@ def generate(schema, prompt, model, use_ollama, ollama_model, output_format, use backend = initialize_backend(use_ollama, model, ollama_model) - jsonformer = Jsonformer( - model_backend=backend, - json_schema=json_schema, - prompt=prompt, - output_format=output_format - ) + results = [] + for i in range(count): + current_prompt = prompt if count == 1 else f"{prompt} (item {i+1})" + + jsonformer = Jsonformer( + model_backend=backend, + json_schema=json_schema, + prompt=current_prompt, + output_format=output_format + ) - if use_async: - async_jsonformer = AsyncJsonformer(jsonformer) - result = asyncio.run(async_jsonformer()) - else: - result = jsonformer() + if use_async: + async_jsonformer = AsyncJsonformer(jsonformer) + result = asyncio.run(async_jsonformer()) + else: + result = jsonformer() + + results.append(result) - # Pretty-print dict/list, print primitives/null as-is - import sys - from jsonAI.schema_validator import SchemaValidator - if isinstance(result, (dict, list)): - click.echo(json.dumps(result, indent=2, ensure_ascii=False)) + # Prepare final output + final_result = results[0] if count == 1 else results + + # Format output + if output_format.lower() == 'json': + formatted_output = json.dumps(final_result, indent=2, ensure_ascii=False) + elif output_format.lower() == 'yaml': + formatted_output = yaml.dump(final_result, default_flow_style=False) + else: + formatted_output = str(final_result) + + # Output to file or console + if output: + with open(output, 'w') as f: + f.write(formatted_output) + click.echo(f"Output written to {output}") else: - click.echo(result) + click.echo(formatted_output) # Optional: Validate output and print warning if invalid try: + from jsonAI.schema_validator import SchemaValidator validator = SchemaValidator() - validator.validate(result, json_schema) + for result in results: + validator.validate(result, json_schema) except Exception as e: click.echo(f"[WARNING] Output does not validate against schema: {e}", err=True) @@ -84,7 +128,131 @@ def generate_schema(description, model, use_ollama, ollama_model): # Pretty-print schema click.echo(json.dumps(schema, indent=2)) -# (ensure two blank lines above entry point for lint) +# Workflow Management Commands (Enhanced Features) +if ENHANCED_FEATURES: + @cli.group() + def workflow(): + """Workflow management commands""" + pass + + @workflow.command() + @click.argument('config_file', type=click.Path(exists=True)) + def create(config_file): + """Create a workflow from configuration file""" + workflow_cli = WorkflowCLI() + workflow_name = workflow_cli.create_workflow_command(config_file) + click.echo(f"Workflow '{workflow_name}' created successfully") + + @workflow.command() + @click.argument('workflow_name') + @click.option('--variables', '-v', help='Runtime variables as JSON string') + @click.option('--variables-file', type=click.Path(exists=True), help='Variables from file') + def run(workflow_name, variables, variables_file): + """Run a workflow""" + workflow_cli = WorkflowCLI() + + # Parse variables + runtime_vars = {} + if variables: + runtime_vars.update(json.loads(variables)) + if variables_file: + with open(variables_file) as f: + if variables_file.endswith(('.yml', '.yaml')): + runtime_vars.update(yaml.safe_load(f)) + else: + runtime_vars.update(json.load(f)) + + async def run_workflow(): + execution_id = await workflow_cli.run_workflow_command(workflow_name, runtime_vars) + click.echo(f"Workflow execution started with ID: {execution_id}") + return execution_id + + asyncio.run(run_workflow()) + + @workflow.command() + def list(): + """List all available workflows""" + workflow_cli = WorkflowCLI() + workflows = workflow_cli.list_workflows_command() + if workflows: + click.echo("Available workflows:") + for workflow in workflows: + click.echo(f" - {workflow}") + else: + click.echo("No workflows found") + + @workflow.command() + @click.argument('execution_id') + def status(execution_id): + """Get workflow execution status""" + workflow_cli = WorkflowCLI() + execution = workflow_cli.status_command(execution_id) + if execution: + click.echo(f"Execution ID: {execution.id}") + click.echo(f"Workflow: {execution.workflow_name}") + click.echo(f"Status: {execution.status.value}") + click.echo(f"Start Time: {execution.start_time}") + if execution.end_time: + click.echo(f"End Time: {execution.end_time}") + if execution.error_message: + click.echo(f"Error: {execution.error_message}") + else: + click.echo(f"Execution '{execution_id}' not found") + + # API Testing Commands (Enhanced Features) + @cli.group() + def test(): + """API testing commands""" + pass + + @test.command('import-collection') + @click.option('--source', type=click.Choice(['postman', 'openapi']), required=True, help='Collection source') + @click.argument('file', type=click.Path(exists=True)) + def import_collection(source, file): + """Import API collection for testing""" + api_cli = APITestingCLI() + collection_id = api_cli.import_collection_command(source, file) + click.echo(f"Collection imported with ID: {collection_id}") + + @test.command('run-collection') + @click.argument('collection_id') + @click.option('--environment', default='default', help='Environment to use') + def run_collection(collection_id, environment): + """Run API collection tests""" + api_cli = APITestingCLI() + + async def run_tests(): + results = await api_cli.run_tests_command(collection_id, environment) + + total_tests = len(results) + passed_tests = sum(1 for r in results if r.success) + failed_tests = total_tests - passed_tests + + click.echo(f"API Test Results:") + click.echo(f" Total: {total_tests}") + click.echo(f" Passed: {passed_tests}") + click.echo(f" Failed: {failed_tests}") + + if failed_tests > 0: + click.echo("\nFailed tests:") + for result in results: + if not result.success: + click.echo(f" - {result.name}: {result.error_message}") + + asyncio.run(run_tests()) + + @test.command('report') + @click.option('--format', 'report_format', type=click.Choice(['html', 'json', 'junit']), default='html', help='Report format') + @click.option('--output', type=click.Path(), help='Output file') + def report(report_format, output): + """Generate test report""" + api_cli = APITestingCLI() + report_content = api_cli.generate_report_command(report_format, output) + + if output: + click.echo(f"Report generated: {output}") + else: + click.echo(report_content[:1000] + "..." if len(report_content) > 1000 else report_content) # Entry point diff --git a/jsonAI/enhanced_cli.py b/jsonAI/enhanced_cli.py new file mode 100644 index 0000000..2aa056c --- /dev/null +++ b/jsonAI/enhanced_cli.py @@ -0,0 +1,544 @@ +""" +Enhanced CLI for JsonAI with workflow automation and API testing capabilities +""" + +import asyncio +import json +import click +import yaml +from pathlib import Path +from typing import Dict, Any, Optional + +from .enhanced_workflow_engine import WorkflowCLI +from .api_collection_tester import APITestingCLI + + +@click.group() +@click.version_option() +def cli(): + """JsonAI - Automated workflow and API testing tool""" + pass + + +# Workflow Management Commands +@cli.group() +def workflow(): + """Workflow management commands""" + pass + + +@workflow.command() +@click.argument('config_file', type=click.Path(exists=True)) +def create(config_file): + """Create a workflow from configuration file""" + workflow_cli = WorkflowCLI() + workflow_name = workflow_cli.create_workflow_command(config_file) + click.echo(f"Workflow '{workflow_name}' created successfully") + + +@workflow.command() +@click.argument('workflow_name') +@click.option('--variables', '-v', help='Runtime variables as JSON string') +@click.option('--variables-file', type=click.Path(exists=True), help='Variables from file') +def run(workflow_name, variables, variables_file): + """Run a workflow""" + workflow_cli = WorkflowCLI() + + # Parse variables + runtime_vars = {} + if variables: + runtime_vars.update(json.loads(variables)) + if variables_file: + with open(variables_file) as f: + if variables_file.endswith(('.yml', '.yaml')): + runtime_vars.update(yaml.safe_load(f)) + else: + runtime_vars.update(json.load(f)) + + async def run_workflow(): + execution_id = await workflow_cli.run_workflow_command(workflow_name, runtime_vars) + click.echo(f"Workflow execution started with ID: {execution_id}") + return execution_id + + asyncio.run(run_workflow()) + + +@workflow.command() +def list(): + """List all available workflows""" + workflow_cli = WorkflowCLI() + workflows = workflow_cli.list_workflows_command() + if workflows: + click.echo("Available workflows:") + for workflow in workflows: + click.echo(f" - {workflow}") + else: + click.echo("No workflows found") + + +@workflow.command() +@click.argument('execution_id') +def status(execution_id): + """Get workflow execution status""" + workflow_cli = WorkflowCLI() + execution = workflow_cli.status_command(execution_id) + if execution: + click.echo(f"Execution ID: {execution.id}") + click.echo(f"Workflow: {execution.workflow_name}") + click.echo(f"Status: {execution.status.value}") + click.echo(f"Start Time: {execution.start_time}") + if execution.end_time: + click.echo(f"End Time: {execution.end_time}") + if execution.error_message: + click.echo(f"Error: {execution.error_message}") + else: + click.echo(f"Execution '{execution_id}' not found") + + +@workflow.command() +@click.argument('workflow_name') +@click.option('--type', 'schedule_type', default='daily', help='Schedule type (daily, interval, cron)') +@click.option('--time', help='Time for daily schedule (HH:MM)') +@click.option('--interval', type=int, help='Interval in seconds for interval schedule') +@click.option('--cron', help='Cron expression for cron schedule') +def schedule(workflow_name, schedule_type, time, interval, cron): + """Schedule a workflow for execution""" + workflow_cli = WorkflowCLI() + + schedule_config = {'type': schedule_type} + if schedule_type == 'daily' and time: + schedule_config['time'] = time + elif schedule_type == 'interval' and interval: + schedule_config['interval_seconds'] = interval + elif schedule_type == 'cron' and cron: + schedule_config['cron'] = cron + + schedule_id = workflow_cli.schedule_workflow_command(workflow_name, schedule_config) + click.echo(f"Workflow '{workflow_name}' scheduled with ID: {schedule_id}") + + +# API Testing Commands +@cli.group() +def test(): + """API testing commands""" + pass + + +@test.command('import-collection') +@click.option('--source', type=click.Choice(['postman', 'openapi']), required=True, help='Collection source') +@click.argument('file', type=click.Path(exists=True)) +def import_collection(source, file): + """Import API collection for testing""" + api_cli = APITestingCLI() + collection_id = api_cli.import_collection_command(source, file) + click.echo(f"Collection imported with ID: {collection_id}") + + +@test.command('run-collection') +@click.argument('collection_id') +@click.option('--environment', default='default', help='Environment to use') +@click.option('--parallel/--sequential', default=True, help='Run tests in parallel') +@click.option('--max-concurrent', type=int, default=5, help='Maximum concurrent requests') +def run_collection(collection_id, environment, parallel, max_concurrent): + """Run API collection tests""" + api_cli = APITestingCLI() + + async def run_tests(): + results = await api_cli.run_tests_command(collection_id, environment) + + total_tests = len(results) + passed_tests = sum(1 for r in results if r.success) + failed_tests = total_tests - passed_tests + + click.echo(f"API Test Results:") + click.echo(f" Total: {total_tests}") + click.echo(f" Passed: {passed_tests}") + click.echo(f" Failed: {failed_tests}") + + if failed_tests > 0: + click.echo("\nFailed tests:") + for result in results: + if not result.success: + click.echo(f" - {result.name}: {result.error_message}") + + asyncio.run(run_tests()) + + +@test.command('generate-data') +@click.option('--schema', type=click.Path(exists=True), help='JSON schema file') +@click.option('--count', type=int, default=1, help='Number of data items to generate') +@click.option('--output', type=click.Path(), help='Output file') +@click.option('--format', 'output_format', type=click.Choice(['json', 'yaml', 'csv']), default='json', help='Output format') +def generate_data(schema, count, output, output_format): + """Generate test data based on schema""" + if not schema: + click.echo("Schema file is required") + return + + with open(schema) as f: + schema_data = json.load(f) + + from .main import Jsonformer + from .model_backends import DummyBackend + + backend = DummyBackend() + generated_data = [] + + for i in range(count): + jsonformer = Jsonformer( + model_backend=backend, + json_schema=schema_data, + prompt=f"Generate test data item {i+1}", + output_format=output_format + ) + data = jsonformer.generate_data() + generated_data.append(data) + + if count == 1: + result = generated_data[0] + else: + result = generated_data + + if output: + with open(output, 'w') as f: + if output_format == 'json': + json.dump(result, f, indent=2) + elif output_format == 'yaml': + yaml.dump(result, f, default_flow_style=False) + else: + f.write(str(result)) + click.echo(f"Generated data written to {output}") + else: + if output_format == 'json': + click.echo(json.dumps(result, indent=2)) + elif output_format == 'yaml': + click.echo(yaml.dump(result, default_flow_style=False)) + else: + click.echo(result) + + +@test.command('report') +@click.option('--format', 'report_format', type=click.Choice(['html', 'json', 'junit']), default='html', help='Report format') +@click.option('--output', type=click.Path(), help='Output file') +def report(report_format, output): + """Generate test report""" + api_cli = APITestingCLI() + report_content = api_cli.generate_report_command(report_format, output) + + if output: + click.echo(f"Report generated: {output}") + else: + click.echo(report_content) + + +# Data Pipeline Commands +@cli.group() +def data(): + """Data pipeline commands""" + pass + + +@data.command('validate') +@click.argument('data_file', type=click.Path(exists=True)) +@click.argument('schema_file', type=click.Path(exists=True)) +def validate(data_file, schema_file): + """Validate data against schema""" + with open(data_file) as f: + data = json.load(f) + + with open(schema_file) as f: + schema = json.load(f) + + from .schema_validator import SchemaValidator + validator = SchemaValidator() + is_valid, errors = validator.validate(data, schema) + + if is_valid: + click.echo("✓ Data is valid") + else: + click.echo("✗ Data validation failed:") + for error in errors: + click.echo(f" - {error}") + + +@data.command('transform') +@click.argument('data_file', type=click.Path(exists=True)) +@click.argument('config_file', type=click.Path(exists=True)) +@click.option('--output', type=click.Path(), help='Output file') +def transform(data_file, config_file, output): + """Transform data using configuration""" + with open(data_file) as f: + data = json.load(f) + + with open(config_file) as f: + config = yaml.safe_load(f) + + # Simple transformation implementation + transformations = config.get('transformations', []) + result = data + + for transform in transformations: + if transform['type'] == 'filter': + # Filter implementation + pass + elif transform['type'] == 'map': + # Map implementation + pass + + if output: + with open(output, 'w') as f: + json.dump(result, f, indent=2) + click.echo(f"Transformed data written to {output}") + else: + click.echo(json.dumps(result, indent=2)) + + +# Integration Commands +@cli.group() +def integration(): + """Integration management commands""" + pass + + +@integration.command('github') +@click.option('--token', required=True, help='GitHub personal access token') +@click.option('--repo', help='Repository in format owner/repo') +def github(token, repo): + """Setup GitHub integration""" + from .integration_hub import IntegrationHub + + async def setup_github(): + hub = IntegrationHub() + await hub.github_integration(token=token, repo=repo) + click.echo("GitHub integration configured") + + asyncio.run(setup_github()) + + +@integration.command('slack') +@click.option('--webhook-url', required=True, help='Slack webhook URL') +def slack(webhook_url): + """Setup Slack integration""" + from .integration_hub import IntegrationHub + + async def setup_slack(): + hub = IntegrationHub() + await hub.slack_integration(webhook_url=webhook_url) + click.echo("Slack integration configured") + + asyncio.run(setup_slack()) + + +# Server Commands +@cli.group() +def server(): + """Server management commands""" + pass + + +@server.command('start') +@click.option('--host', default='localhost', help='Host to bind to') +@click.option('--port', default=8000, type=int, help='Port to bind to') +@click.option('--workers', default=1, type=int, help='Number of worker processes') +def start(host, port, workers): + """Start the JsonAI API server""" + import uvicorn + from .api import app + + click.echo(f"Starting JsonAI API server on {host}:{port}") + uvicorn.run( + "jsonAI.api:app", + host=host, + port=port, + workers=workers, + reload=False + ) + + +# Utility Commands +@cli.group() +def utils(): + """Utility commands""" + pass + + +@utils.command('init') +@click.argument('project_name') +@click.option('--type', 'project_type', type=click.Choice(['workflow', 'api-testing', 'full']), default='full', help='Project type') +def init(project_name, project_type): + """Initialize a new JsonAI project""" + project_path = Path(project_name) + project_path.mkdir(exist_ok=True) + + # Create basic project structure + (project_path / 'workflows').mkdir(exist_ok=True) + (project_path / 'collections').mkdir(exist_ok=True) + (project_path / 'schemas').mkdir(exist_ok=True) + (project_path / 'config').mkdir(exist_ok=True) + + # Create example files + if project_type in ['workflow', 'full']: + example_workflow = { + 'name': 'example_workflow', + 'version': '1.0', + 'description': 'Example workflow for JsonAI', + 'variables': { + 'api_base_url': 'https://api.example.com' + }, + 'steps': [ + { + 'id': 'generate_user_data', + 'name': 'Generate User Data', + 'type': 'json_generation', + 'config': { + 'schema': { + 'type': 'object', + 'properties': { + 'name': {'type': 'string'}, + 'email': {'type': 'string', 'format': 'email'}, + 'age': {'type': 'integer', 'minimum': 18} + } + }, + 'prompt': 'Generate a realistic user profile', + 'output_variable': 'user_data' + } + }, + { + 'id': 'send_api_request', + 'name': 'Send API Request', + 'type': 'api_request', + 'depends_on': ['generate_user_data'], + 'config': { + 'method': 'POST', + 'url': '${api_base_url}/users', + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': '${user_data}', + 'output_variable': 'api_response' + } + } + ] + } + + with open(project_path / 'workflows' / 'example_workflow.yaml', 'w') as f: + yaml.dump(example_workflow, f, default_flow_style=False) + + if project_type in ['api-testing', 'full']: + example_schema = { + 'type': 'object', + 'properties': { + 'id': {'type': 'integer'}, + 'name': {'type': 'string'}, + 'email': {'type': 'string', 'format': 'email'}, + 'created_at': {'type': 'string', 'format': 'date-time'} + }, + 'required': ['name', 'email'] + } + + with open(project_path / 'schemas' / 'user_schema.json', 'w') as f: + json.dump(example_schema, f, indent=2) + + # Create config file + config = { + 'project_name': project_name, + 'version': '1.0.0', + 'description': f'{project_name} JsonAI project', + 'settings': { + 'model_backend': 'dummy', + 'default_output_format': 'json', + 'max_retries': 3, + 'timeout': 300 + } + } + + with open(project_path / 'config' / 'project.yaml', 'w') as f: + yaml.dump(config, f, default_flow_style=False) + + # Create README + readme_content = f"""# {project_name} + +JsonAI project for workflow automation and API testing. + +## Structure + +- `workflows/` - Workflow definitions +- `collections/` - API collections (Postman, OpenAPI) +- `schemas/` - JSON schemas for data generation +- `config/` - Project configuration + +## Quick Start + +1. Run example workflow: + ```bash + jsonai workflow run example_workflow + ``` + +2. Generate test data: + ```bash + jsonai test generate-data --schema schemas/user_schema.json --count 10 + ``` + +3. Start API server: + ```bash + jsonai server start + ``` + +## Documentation + +Visit the [JsonAI documentation](https://github.com/kishoretvk/jsonAI) for more information. +""" + + with open(project_path / 'README.md', 'w') as f: + f.write(readme_content) + + click.echo(f"✓ Project '{project_name}' initialized successfully") + click.echo(f" Directory: {project_path.absolute()}") + click.echo(f" Type: {project_type}") + + +@utils.command('validate-workflow') +@click.argument('workflow_file', type=click.Path(exists=True)) +def validate_workflow(workflow_file): + """Validate workflow configuration file""" + try: + with open(workflow_file) as f: + if workflow_file.endswith(('.yml', '.yaml')): + workflow_data = yaml.safe_load(f) + else: + workflow_data = json.load(f) + + # Basic validation + required_fields = ['name', 'version', 'steps'] + for field in required_fields: + if field not in workflow_data: + click.echo(f"✗ Missing required field: {field}") + return + + steps = workflow_data.get('steps', []) + if not steps: + click.echo("✗ Workflow must have at least one step") + return + + step_ids = set() + for step in steps: + if 'id' not in step: + click.echo("✗ Step missing required 'id' field") + return + if step['id'] in step_ids: + click.echo(f"✗ Duplicate step ID: {step['id']}") + return + step_ids.add(step['id']) + + if 'type' not in step: + click.echo(f"✗ Step '{step['id']}' missing required 'type' field") + return + + click.echo("✓ Workflow configuration is valid") + + except Exception as e: + click.echo(f"✗ Error validating workflow: {e}") + + +if __name__ == '__main__': + cli() \ No newline at end of file diff --git a/jsonAI/enhanced_workflow_engine.py b/jsonAI/enhanced_workflow_engine.py new file mode 100644 index 0000000..6e9f1e9 --- /dev/null +++ b/jsonAI/enhanced_workflow_engine.py @@ -0,0 +1,817 @@ +""" +Enhanced Workflow Automation Framework for JsonAI + +This module provides comprehensive workflow automation capabilities including: +- YAML/JSON workflow definitions +- Conditional branching and loops +- Parallel execution +- Error handling and retry mechanisms +- Workflow scheduling +- Progress monitoring and reporting +""" + +import asyncio +import json +import time +import uuid +from datetime import datetime, timedelta +from enum import Enum +from typing import Dict, List, Any, Optional, Union, Callable +from dataclasses import dataclass, field +import yaml +from pathlib import Path + +from .main import Jsonformer +from .api_collection_tester import APICollectionTester, APITestResult +from .model_backends import DummyBackend + + +class WorkflowStepType(Enum): + """Types of workflow steps""" + JSON_GENERATION = "json_generation" + API_REQUEST = "api_request" + API_TEST_COLLECTION = "api_test_collection" + DATA_VALIDATION = "data_validation" + DATA_TRANSFORMATION = "data_transformation" + CONDITIONAL = "conditional" + LOOP = "loop" + PARALLEL = "parallel" + DELAY = "delay" + SCRIPT = "script" + NOTIFICATION = "notification" + CLEANUP = "cleanup" + + +class WorkflowStatus(Enum): + """Workflow execution status""" + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + PAUSED = "paused" + + +@dataclass +class WorkflowStep: + """Represents a single workflow step""" + id: str + name: str + type: WorkflowStepType + config: Dict[str, Any] = field(default_factory=dict) + depends_on: List[str] = field(default_factory=list) + parallel: bool = False + retry_count: int = 0 + timeout: int = 300 # 5 minutes default + on_failure: str = "stop" # stop, continue, retry, fallback + fallback_step: Optional[str] = None + conditions: Optional[Dict[str, Any]] = None + + +@dataclass +class WorkflowStepResult: + """Result of a workflow step execution""" + step_id: str + status: WorkflowStatus + output: Any = None + error: Optional[str] = None + start_time: Optional[datetime] = None + end_time: Optional[datetime] = None + execution_time: float = 0.0 + retry_count: int = 0 + + +@dataclass +class WorkflowDefinition: + """Complete workflow definition""" + name: str + version: str + description: str = "" + variables: Dict[str, Any] = field(default_factory=dict) + steps: List[WorkflowStep] = field(default_factory=list) + error_handling: Dict[str, Any] = field(default_factory=dict) + schedule: Optional[Dict[str, Any]] = None + notifications: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class WorkflowExecution: + """Represents a workflow execution instance""" + id: str + workflow_name: str + status: WorkflowStatus + start_time: datetime + end_time: Optional[datetime] = None + step_results: Dict[str, WorkflowStepResult] = field(default_factory=dict) + variables: Dict[str, Any] = field(default_factory=dict) + error_message: Optional[str] = None + + +class WorkflowEngine: + """ + Enhanced workflow execution engine with advanced features + """ + + def __init__(self, model_backend=None): + self.model_backend = model_backend or DummyBackend() + self.api_tester = APICollectionTester(self.model_backend) + self.workflows: Dict[str, WorkflowDefinition] = {} + self.executions: Dict[str, WorkflowExecution] = {} + self.step_executors: Dict[WorkflowStepType, Callable] = { + WorkflowStepType.JSON_GENERATION: self._execute_json_generation, + WorkflowStepType.API_REQUEST: self._execute_api_request, + WorkflowStepType.API_TEST_COLLECTION: self._execute_api_test_collection, + WorkflowStepType.DATA_VALIDATION: self._execute_data_validation, + WorkflowStepType.DATA_TRANSFORMATION: self._execute_data_transformation, + WorkflowStepType.CONDITIONAL: self._execute_conditional, + WorkflowStepType.LOOP: self._execute_loop, + WorkflowStepType.PARALLEL: self._execute_parallel, + WorkflowStepType.DELAY: self._execute_delay, + WorkflowStepType.SCRIPT: self._execute_script, + WorkflowStepType.NOTIFICATION: self._execute_notification, + WorkflowStepType.CLEANUP: self._execute_cleanup, + } + + def load_workflow_from_file(self, file_path: str) -> str: + """ + Load workflow definition from YAML or JSON file + + Args: + file_path: Path to workflow definition file + + Returns: + Workflow ID for reference + """ + with open(file_path, 'r') as f: + if file_path.endswith(('.yml', '.yaml')): + workflow_data = yaml.safe_load(f) + else: + workflow_data = json.load(f) + + workflow = self._parse_workflow_definition(workflow_data) + self.workflows[workflow.name] = workflow + + return workflow.name + + def create_workflow(self, workflow_data: Dict[str, Any]) -> str: + """ + Create workflow from dictionary definition + + Args: + workflow_data: Workflow definition as dictionary + + Returns: + Workflow ID for reference + """ + workflow = self._parse_workflow_definition(workflow_data) + self.workflows[workflow.name] = workflow + return workflow.name + + async def execute_workflow(self, workflow_name: str, + variables: Optional[Dict[str, Any]] = None) -> str: + """ + Execute a workflow + + Args: + workflow_name: Name of the workflow to execute + variables: Runtime variables to override defaults + + Returns: + Execution ID for tracking + """ + if workflow_name not in self.workflows: + raise ValueError(f"Workflow '{workflow_name}' not found") + + workflow = self.workflows[workflow_name] + execution_id = str(uuid.uuid4()) + + # Initialize execution + execution = WorkflowExecution( + id=execution_id, + workflow_name=workflow_name, + status=WorkflowStatus.PENDING, + start_time=datetime.now(), + variables={**workflow.variables, **(variables or {})} + ) + + self.executions[execution_id] = execution + + try: + # Start execution + execution.status = WorkflowStatus.RUNNING + + # Build dependency graph + dependency_graph = self._build_dependency_graph(workflow.steps) + + # Execute steps in dependency order + await self._execute_workflow_steps(workflow, execution, dependency_graph) + + execution.status = WorkflowStatus.COMPLETED + execution.end_time = datetime.now() + + except Exception as e: + execution.status = WorkflowStatus.FAILED + execution.error_message = str(e) + execution.end_time = datetime.now() + + # Handle workflow-level error + await self._handle_workflow_error(workflow, execution, e) + + return execution_id + + def get_execution_status(self, execution_id: str) -> Optional[WorkflowExecution]: + """Get execution status and results""" + return self.executions.get(execution_id) + + def cancel_execution(self, execution_id: str) -> bool: + """Cancel a running workflow execution""" + if execution_id in self.executions: + execution = self.executions[execution_id] + if execution.status == WorkflowStatus.RUNNING: + execution.status = WorkflowStatus.CANCELLED + execution.end_time = datetime.now() + return True + return False + + def list_workflows(self) -> List[str]: + """List all available workflows""" + return list(self.workflows.keys()) + + def list_executions(self, workflow_name: Optional[str] = None) -> List[WorkflowExecution]: + """List workflow executions, optionally filtered by workflow name""" + executions = list(self.executions.values()) + if workflow_name: + executions = [e for e in executions if e.workflow_name == workflow_name] + return executions + + def _parse_workflow_definition(self, workflow_data: Dict[str, Any]) -> WorkflowDefinition: + """Parse workflow definition from dictionary""" + steps = [] + for step_data in workflow_data.get('steps', []): + step = WorkflowStep( + id=step_data['id'], + name=step_data.get('name', step_data['id']), + type=WorkflowStepType(step_data['type']), + config=step_data.get('config', {}), + depends_on=step_data.get('depends_on', []), + parallel=step_data.get('parallel', False), + retry_count=step_data.get('retry_count', 0), + timeout=step_data.get('timeout', 300), + on_failure=step_data.get('on_failure', 'stop'), + fallback_step=step_data.get('fallback_step'), + conditions=step_data.get('conditions') + ) + steps.append(step) + + return WorkflowDefinition( + name=workflow_data['name'], + version=workflow_data.get('version', '1.0'), + description=workflow_data.get('description', ''), + variables=workflow_data.get('variables', {}), + steps=steps, + error_handling=workflow_data.get('error_handling', {}), + schedule=workflow_data.get('schedule'), + notifications=workflow_data.get('notifications', {}) + ) + + def _build_dependency_graph(self, steps: List[WorkflowStep]) -> Dict[str, List[str]]: + """Build dependency graph for steps""" + graph = {} + step_map = {step.id: step for step in steps} + + for step in steps: + graph[step.id] = [] + for dep in step.depends_on: + if dep in step_map: + graph[step.id].append(dep) + + return graph + + async def _execute_workflow_steps(self, workflow: WorkflowDefinition, + execution: WorkflowExecution, + dependency_graph: Dict[str, List[str]]): + """Execute workflow steps respecting dependencies""" + completed_steps = set() + step_map = {step.id: step for step in workflow.steps} + + while len(completed_steps) < len(workflow.steps): + # Find steps ready to execute (all dependencies completed) + ready_steps = [] + for step_id, deps in dependency_graph.items(): + if step_id not in completed_steps and all(dep in completed_steps for dep in deps): + step = step_map[step_id] + + # Check conditions if any + if self._check_step_conditions(step, execution): + ready_steps.append(step) + else: + completed_steps.add(step_id) # Skip step due to conditions + + if not ready_steps: + break # No more steps can be executed + + # Group parallel steps + parallel_steps = [s for s in ready_steps if s.parallel] + sequential_steps = [s for s in ready_steps if not s.parallel] + + # Execute parallel steps concurrently + if parallel_steps: + tasks = [self._execute_single_step(step, execution) for step in parallel_steps] + results = await asyncio.gather(*tasks, return_exceptions=True) + + for step, result in zip(parallel_steps, results): + if isinstance(result, Exception): + execution.step_results[step.id] = WorkflowStepResult( + step_id=step.id, + status=WorkflowStatus.FAILED, + error=str(result) + ) + await self._handle_step_error(step, execution, result) + else: + completed_steps.add(step.id) + + # Execute sequential steps one by one + for step in sequential_steps: + try: + await self._execute_single_step(step, execution) + completed_steps.add(step.id) + except Exception as e: + await self._handle_step_error(step, execution, e) + if step.on_failure == 'stop': + raise + + async def _execute_single_step(self, step: WorkflowStep, execution: WorkflowExecution): + """Execute a single workflow step""" + result = WorkflowStepResult( + step_id=step.id, + status=WorkflowStatus.RUNNING, + start_time=datetime.now() + ) + + execution.step_results[step.id] = result + + try: + # Execute step with timeout + executor = self.step_executors.get(step.type) + if not executor: + raise ValueError(f"No executor found for step type: {step.type}") + + output = await asyncio.wait_for( + executor(step, execution), + timeout=step.timeout + ) + + result.output = output + result.status = WorkflowStatus.COMPLETED + result.end_time = datetime.now() + result.execution_time = (result.end_time - result.start_time).total_seconds() + + except asyncio.TimeoutError: + result.status = WorkflowStatus.FAILED + result.error = f"Step timed out after {step.timeout} seconds" + result.end_time = datetime.now() + raise + except Exception as e: + result.status = WorkflowStatus.FAILED + result.error = str(e) + result.end_time = datetime.now() + raise + + def _check_step_conditions(self, step: WorkflowStep, execution: WorkflowExecution) -> bool: + """Check if step conditions are met""" + if not step.conditions: + return True + + # Simple condition evaluation (can be extended) + for condition_type, condition_value in step.conditions.items(): + if condition_type == "variable_equals": + var_name, expected_value = condition_value.split('=', 1) + if execution.variables.get(var_name) != expected_value: + return False + elif condition_type == "step_success": + step_result = execution.step_results.get(condition_value) + if not step_result or step_result.status != WorkflowStatus.COMPLETED: + return False + + return True + + async def _handle_step_error(self, step: WorkflowStep, execution: WorkflowExecution, error: Exception): + """Handle step execution error""" + result = execution.step_results[step.id] + + if step.retry_count > result.retry_count: + # Retry the step + result.retry_count += 1 + await asyncio.sleep(2 ** result.retry_count) # Exponential backoff + await self._execute_single_step(step, execution) + elif step.fallback_step: + # Execute fallback step + fallback_step = next((s for s in self.workflows[execution.workflow_name].steps + if s.id == step.fallback_step), None) + if fallback_step: + await self._execute_single_step(fallback_step, execution) + elif step.on_failure == 'continue': + # Continue with next steps + pass + else: + # Stop workflow + raise error + + async def _handle_workflow_error(self, workflow: WorkflowDefinition, + execution: WorkflowExecution, error: Exception): + """Handle workflow-level error""" + # Send notifications if configured + if workflow.notifications.get('on_failure'): + await self._send_notification( + workflow.notifications['on_failure'], + f"Workflow '{workflow.name}' failed: {str(error)}" + ) + + # Step Executors + + async def _execute_json_generation(self, step: WorkflowStep, execution: WorkflowExecution) -> Dict[str, Any]: + """Execute JSON generation step""" + config = step.config + schema = config.get('schema', {}) + prompt = config.get('prompt', '') + + # Substitute variables in prompt + prompt = self._substitute_variables(prompt, execution.variables) + + jsonformer = Jsonformer( + model_backend=self.model_backend, + json_schema=schema, + prompt=prompt, + **config.get('jsonformer_options', {}) + ) + + result = jsonformer.generate_data() + + # Store result in execution variables for use by other steps + var_name = config.get('output_variable', f"{step.id}_output") + execution.variables[var_name] = result + + return result + + async def _execute_api_request(self, step: WorkflowStep, execution: WorkflowExecution) -> Dict[str, Any]: + """Execute API request step""" + import aiohttp + + config = step.config + method = config.get('method', 'GET') + url = self._substitute_variables(config.get('url', ''), execution.variables) + headers = config.get('headers', {}) + body = config.get('body') + + # Substitute variables in body if it's a string template + if isinstance(body, str): + body = self._substitute_variables(body, execution.variables) + + async with aiohttp.ClientSession() as session: + async with session.request(method, url, headers=headers, json=body) as response: + result = { + 'status_code': response.status, + 'headers': dict(response.headers), + 'body': await response.json() if response.content_type == 'application/json' else await response.text() + } + + # Store result in execution variables + var_name = config.get('output_variable', f"{step.id}_output") + execution.variables[var_name] = result + + return result + + async def _execute_api_test_collection(self, step: WorkflowStep, execution: WorkflowExecution) -> List[APITestResult]: + """Execute API test collection step""" + config = step.config + collection_path = config.get('collection_path') + collection_type = config.get('collection_type', 'postman') + environment = config.get('environment', 'default') + + # Import collection + if collection_type == 'postman': + collection_id = self.api_tester.import_postman_collection(collection_path) + elif collection_type == 'openapi': + collection_id = self.api_tester.import_openapi_spec(collection_path) + else: + raise ValueError(f"Unsupported collection type: {collection_type}") + + # Run tests + results = await self.api_tester.run_collection_tests(collection_id, environment) + + # Store results in execution variables + var_name = config.get('output_variable', f"{step.id}_output") + execution.variables[var_name] = results + + return results + + async def _execute_data_validation(self, step: WorkflowStep, execution: WorkflowExecution) -> Dict[str, Any]: + """Execute data validation step""" + config = step.config + data_variable = config.get('data_variable') + schema = config.get('schema') + + if not data_variable or data_variable not in execution.variables: + raise ValueError(f"Data variable '{data_variable}' not found") + + data = execution.variables[data_variable] + + from .schema_validator import SchemaValidator + validator = SchemaValidator() + is_valid, errors = validator.validate(data, schema) + + result = { + 'valid': is_valid, + 'errors': errors, + 'data': data + } + + if not is_valid and config.get('fail_on_invalid', True): + raise ValueError(f"Data validation failed: {errors}") + + return result + + async def _execute_data_transformation(self, step: WorkflowStep, execution: WorkflowExecution) -> Any: + """Execute data transformation step""" + config = step.config + input_variable = config.get('input_variable') + transformation = config.get('transformation', {}) + + if not input_variable or input_variable not in execution.variables: + raise ValueError(f"Input variable '{input_variable}' not found") + + data = execution.variables[input_variable] + + # Simple transformation operations + if transformation.get('type') == 'filter': + # Filter array based on condition + condition = transformation.get('condition', {}) + if isinstance(data, list): + data = [item for item in data if self._evaluate_condition(item, condition)] + elif transformation.get('type') == 'map': + # Transform each item in array + mapping = transformation.get('mapping', {}) + if isinstance(data, list): + data = [self._apply_mapping(item, mapping) for item in data] + elif transformation.get('type') == 'aggregate': + # Aggregate data + operation = transformation.get('operation') + if operation == 'count' and isinstance(data, list): + data = len(data) + elif operation == 'sum' and isinstance(data, list): + data = sum(data) + + # Store result + output_variable = config.get('output_variable', f"{step.id}_output") + execution.variables[output_variable] = data + + return data + + async def _execute_conditional(self, step: WorkflowStep, execution: WorkflowExecution) -> str: + """Execute conditional step""" + config = step.config + condition = config.get('condition', {}) + + if self._evaluate_condition(execution.variables, condition): + return "condition_met" + else: + return "condition_not_met" + + async def _execute_loop(self, step: WorkflowStep, execution: WorkflowExecution) -> List[Any]: + """Execute loop step""" + config = step.config + loop_variable = config.get('loop_variable') + loop_data = execution.variables.get(loop_variable, []) + loop_steps = config.get('steps', []) + + results = [] + for item in loop_data: + # Set current item as variable + execution.variables['_current_item'] = item + + # Execute loop steps (simplified) + for loop_step_config in loop_steps: + # This is a simplified implementation + # In practice, you'd parse and execute these as full workflow steps + pass + + return results + + async def _execute_parallel(self, step: WorkflowStep, execution: WorkflowExecution) -> List[Any]: + """Execute parallel step""" + config = step.config + parallel_steps = config.get('steps', []) + + # Execute all parallel steps concurrently + tasks = [] + for parallel_step_config in parallel_steps: + # This is a simplified implementation + # In practice, you'd parse and execute these as full workflow steps + pass + + results = await asyncio.gather(*tasks) + return results + + async def _execute_delay(self, step: WorkflowStep, execution: WorkflowExecution) -> None: + """Execute delay step""" + config = step.config + delay_seconds = config.get('seconds', 1) + await asyncio.sleep(delay_seconds) + + async def _execute_script(self, step: WorkflowStep, execution: WorkflowExecution) -> Any: + """Execute script step (simplified)""" + config = step.config + script = config.get('script', '') + language = config.get('language', 'python') + + if language == 'python': + # Very basic Python script execution (security risk in production) + local_vars = execution.variables.copy() + exec(script, {}, local_vars) + + # Update execution variables with changes + execution.variables.update(local_vars) + + return local_vars.get('result') + else: + raise ValueError(f"Unsupported script language: {language}") + + async def _execute_notification(self, step: WorkflowStep, execution: WorkflowExecution) -> None: + """Execute notification step""" + config = step.config + message = self._substitute_variables(config.get('message', ''), execution.variables) + await self._send_notification(config, message) + + async def _execute_cleanup(self, step: WorkflowStep, execution: WorkflowExecution) -> None: + """Execute cleanup step""" + config = step.config + cleanup_variables = config.get('variables', []) + + for var_name in cleanup_variables: + execution.variables.pop(var_name, None) + + # Helper Methods + + def _substitute_variables(self, text: str, variables: Dict[str, Any]) -> str: + """Substitute variables in text using ${variable} format""" + import re + + def replace_var(match): + var_name = match.group(1) + return str(variables.get(var_name, match.group(0))) + + return re.sub(r'\$\{([^}]+)\}', replace_var, text) + + def _evaluate_condition(self, data: Any, condition: Dict[str, Any]) -> bool: + """Evaluate a condition against data""" + condition_type = condition.get('type') + + if condition_type == 'equals': + field = condition.get('field') + value = condition.get('value') + if isinstance(data, dict): + return data.get(field) == value + else: + return data == value + elif condition_type == 'greater_than': + field = condition.get('field') + value = condition.get('value') + if isinstance(data, dict): + return data.get(field, 0) > value + else: + return data > value + elif condition_type == 'contains': + field = condition.get('field') + value = condition.get('value') + if isinstance(data, dict): + return value in str(data.get(field, '')) + else: + return value in str(data) + + return False + + def _apply_mapping(self, item: Any, mapping: Dict[str, str]) -> Dict[str, Any]: + """Apply field mapping to an item""" + if not isinstance(item, dict): + return item + + result = {} + for new_field, old_field in mapping.items(): + result[new_field] = item.get(old_field) + + return result + + async def _send_notification(self, config: Dict[str, Any], message: str): + """Send notification (simplified implementation)""" + notification_type = config.get('type') + + if notification_type == 'email': + # Implement email notification + print(f"EMAIL: {message}") + elif notification_type == 'slack': + # Implement Slack notification + print(f"SLACK: {message}") + elif notification_type == 'webhook': + # Implement webhook notification + import aiohttp + webhook_url = config.get('url') + if webhook_url: + async with aiohttp.ClientSession() as session: + await session.post(webhook_url, json={'message': message}) + else: + print(f"NOTIFICATION: {message}") + + +class WorkflowScheduler: + """ + Workflow scheduling and management + """ + + def __init__(self, workflow_engine: WorkflowEngine): + self.engine = workflow_engine + self.scheduled_workflows: Dict[str, Dict[str, Any]] = {} + self.running = False + + def schedule_workflow(self, workflow_name: str, schedule_config: Dict[str, Any]) -> str: + """Schedule a workflow for execution""" + schedule_id = str(uuid.uuid4()) + self.scheduled_workflows[schedule_id] = { + 'workflow_name': workflow_name, + 'schedule': schedule_config, + 'next_run': self._calculate_next_run(schedule_config) + } + return schedule_id + + def _calculate_next_run(self, schedule_config: Dict[str, Any]) -> datetime: + """Calculate next run time based on schedule configuration""" + schedule_type = schedule_config.get('type', 'cron') + + if schedule_type == 'interval': + interval_seconds = schedule_config.get('interval_seconds', 3600) + return datetime.now() + timedelta(seconds=interval_seconds) + elif schedule_type == 'daily': + time_str = schedule_config.get('time', '00:00') + hour, minute = map(int, time_str.split(':')) + next_run = datetime.now().replace(hour=hour, minute=minute, second=0, microsecond=0) + if next_run <= datetime.now(): + next_run += timedelta(days=1) + return next_run + elif schedule_type == 'cron': + # Simplified cron implementation + # In practice, use a library like croniter + return datetime.now() + timedelta(hours=1) + + return datetime.now() + timedelta(hours=1) + + async def start_scheduler(self): + """Start the workflow scheduler""" + self.running = True + while self.running: + current_time = datetime.now() + + for schedule_id, schedule_info in list(self.scheduled_workflows.items()): + if current_time >= schedule_info['next_run']: + # Execute workflow + workflow_name = schedule_info['workflow_name'] + try: + await self.engine.execute_workflow(workflow_name) + # Update next run time + schedule_info['next_run'] = self._calculate_next_run(schedule_info['schedule']) + except Exception as e: + print(f"Scheduled workflow '{workflow_name}' failed: {e}") + + # Sleep for a minute before checking again + await asyncio.sleep(60) + + def stop_scheduler(self): + """Stop the workflow scheduler""" + self.running = False + + +# CLI Integration for Workflow Management +class WorkflowCLI: + """CLI interface for workflow management""" + + def __init__(self): + self.engine = WorkflowEngine() + self.scheduler = WorkflowScheduler(self.engine) + + def create_workflow_command(self, config_file: str) -> str: + """CLI command to create workflow from file""" + return self.engine.load_workflow_from_file(config_file) + + async def run_workflow_command(self, workflow_name: str, variables: Optional[Dict[str, Any]] = None) -> str: + """CLI command to run workflow""" + return await self.engine.execute_workflow(workflow_name, variables) + + def list_workflows_command(self) -> List[str]: + """CLI command to list workflows""" + return self.engine.list_workflows() + + def status_command(self, execution_id: str) -> Optional[WorkflowExecution]: + """CLI command to get execution status""" + return self.engine.get_execution_status(execution_id) + + def schedule_workflow_command(self, workflow_name: str, schedule_config: Dict[str, Any]) -> str: + """CLI command to schedule workflow""" + return self.scheduler.schedule_workflow(workflow_name, schedule_config) \ No newline at end of file diff --git a/tests/test_enhanced_features.py b/tests/test_enhanced_features.py new file mode 100644 index 0000000..eae32e5 --- /dev/null +++ b/tests/test_enhanced_features.py @@ -0,0 +1,495 @@ +""" +Tests for enhanced workflow automation and API testing features +""" + +import asyncio +import json +import pytest +import tempfile +import yaml +from pathlib import Path +from unittest.mock import Mock, AsyncMock, patch + +from jsonAI.enhanced_workflow_engine import ( + WorkflowEngine, WorkflowDefinition, WorkflowStep, + WorkflowStepType, WorkflowStatus, WorkflowScheduler +) +from jsonAI.api_collection_tester import ( + APICollectionTester, APITestRequest, APICollection, APITestResult +) +from jsonAI.model_backends import DummyBackend + + +class TestWorkflowEngine: + """Test the enhanced workflow engine""" + + @pytest.fixture + def workflow_engine(self): + return WorkflowEngine(DummyBackend()) + + @pytest.fixture + def simple_workflow_data(self): + return { + "name": "test_workflow", + "version": "1.0", + "description": "Test workflow", + "variables": {"test_var": "test_value"}, + "steps": [ + { + "id": "step1", + "name": "Generate Data", + "type": "json_generation", + "config": { + "schema": {"type": "object", "properties": {"name": {"type": "string"}}}, + "prompt": "Generate test data", + "output_variable": "generated_data" + } + }, + { + "id": "step2", + "name": "Validate Data", + "type": "data_validation", + "depends_on": ["step1"], + "config": { + "data_variable": "generated_data", + "schema": {"type": "object", "properties": {"name": {"type": "string"}}} + } + } + ] + } + + def test_create_workflow(self, workflow_engine, simple_workflow_data): + """Test workflow creation""" + workflow_name = workflow_engine.create_workflow(simple_workflow_data) + assert workflow_name == "test_workflow" + assert "test_workflow" in workflow_engine.workflows + + workflow = workflow_engine.workflows["test_workflow"] + assert workflow.name == "test_workflow" + assert workflow.version == "1.0" + assert len(workflow.steps) == 2 + + def test_load_workflow_from_file(self, workflow_engine, simple_workflow_data): + """Test loading workflow from YAML file""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + yaml.dump(simple_workflow_data, f) + temp_file = f.name + + try: + workflow_name = workflow_engine.load_workflow_from_file(temp_file) + assert workflow_name == "test_workflow" + assert "test_workflow" in workflow_engine.workflows + finally: + Path(temp_file).unlink() + + @pytest.mark.asyncio + async def test_execute_workflow(self, workflow_engine, simple_workflow_data): + """Test workflow execution""" + workflow_name = workflow_engine.create_workflow(simple_workflow_data) + execution_id = await workflow_engine.execute_workflow(workflow_name) + + assert execution_id is not None + execution = workflow_engine.get_execution_status(execution_id) + assert execution is not None + assert execution.workflow_name == "test_workflow" + assert execution.status in [WorkflowStatus.COMPLETED, WorkflowStatus.FAILED] + + def test_dependency_graph(self, workflow_engine, simple_workflow_data): + """Test dependency graph building""" + workflow = workflow_engine._parse_workflow_definition(simple_workflow_data) + graph = workflow_engine._build_dependency_graph(workflow.steps) + + assert "step1" in graph + assert "step2" in graph + assert graph["step1"] == [] # No dependencies + assert graph["step2"] == ["step1"] # Depends on step1 + + def test_variable_substitution(self, workflow_engine): + """Test variable substitution""" + variables = {"name": "test", "value": 123} + text = "Hello ${name}, your value is ${value}" + result = workflow_engine._substitute_variables(text, variables) + assert result == "Hello test, your value is 123" + + def test_condition_evaluation(self, workflow_engine): + """Test condition evaluation""" + data = {"status": "active", "count": 5} + + # Test equals condition + condition = {"type": "equals", "field": "status", "value": "active"} + assert workflow_engine._evaluate_condition(data, condition) is True + + # Test greater_than condition + condition = {"type": "greater_than", "field": "count", "value": 3} + assert workflow_engine._evaluate_condition(data, condition) is True + + # Test contains condition + condition = {"type": "contains", "field": "status", "value": "act"} + assert workflow_engine._evaluate_condition(data, condition) is True + + +class TestAPICollectionTester: + """Test the API collection testing framework""" + + @pytest.fixture + def api_tester(self): + return APICollectionTester(DummyBackend()) + + @pytest.fixture + def sample_postman_collection(self): + return { + "info": { + "name": "Test Collection", + "description": "Test API collection" + }, + "variable": [ + {"key": "baseUrl", "value": "https://api.test.com"} + ], + "item": [ + { + "name": "Get Users", + "request": { + "method": "GET", + "url": { + "raw": "{{baseUrl}}/users", + "host": ["{{baseUrl}}"], + "path": ["users"] + }, + "header": [ + {"key": "Accept", "value": "application/json"} + ] + } + }, + { + "name": "Create User", + "request": { + "method": "POST", + "url": "{{baseUrl}}/users", + "header": [ + {"key": "Content-Type", "value": "application/json"} + ], + "body": { + "mode": "raw", + "raw": '{"name": "Test User", "email": "test@example.com"}' + } + } + } + ] + } + + @pytest.fixture + def sample_openapi_spec(self): + return { + "openapi": "3.0.0", + "info": { + "title": "Test API", + "version": "1.0.0" + }, + "servers": [ + {"url": "https://api.test.com"} + ], + "paths": { + "/users": { + "get": { + "operationId": "getUsers", + "responses": { + "200": { + "description": "Success", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"} + } + } + } + } + } + } + } + }, + "post": { + "operationId": "createUser", + "requestBody": { + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "email": {"type": "string"} + } + } + } + } + }, + "responses": { + "201": { + "description": "Created" + } + } + } + } + } + } + + def test_parse_postman_collection(self, api_tester, sample_postman_collection): + """Test parsing Postman collection""" + collection = api_tester._parse_postman_collection(sample_postman_collection) + + assert collection.name == "Test Collection" + assert collection.description == "Test API collection" + assert "baseUrl" in collection.variables + assert len(collection.requests) == 2 + + get_request = collection.requests[0] + assert get_request.name == "Get Users" + assert get_request.method == "GET" + assert "{{baseUrl}}/users" in get_request.url + + def test_parse_openapi_spec(self, api_tester, sample_openapi_spec): + """Test parsing OpenAPI specification""" + collection = api_tester._parse_openapi_spec(sample_openapi_spec) + + assert collection.name == "Test API" + assert collection.base_url == "https://api.test.com" + assert len(collection.requests) == 2 + + # Check that test data schema is extracted for POST request + post_request = next(r for r in collection.requests if r.method == "POST") + assert post_request.test_data_schema is not None + assert "properties" in post_request.test_data_schema + + def test_import_postman_collection_from_file(self, api_tester, sample_postman_collection): + """Test importing Postman collection from file""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump(sample_postman_collection, f) + temp_file = f.name + + try: + collection_id = api_tester.import_postman_collection(temp_file) + assert collection_id is not None + assert collection_id in api_tester.collections + finally: + Path(temp_file).unlink() + + def test_import_openapi_spec_from_file(self, api_tester, sample_openapi_spec): + """Test importing OpenAPI spec from file""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump(sample_openapi_spec, f) + temp_file = f.name + + try: + collection_id = api_tester.import_openapi_spec(temp_file) + assert collection_id is not None + assert collection_id in api_tester.collections + finally: + Path(temp_file).unlink() + + def test_generate_test_data_for_collection(self, api_tester, sample_openapi_spec): + """Test test data generation for collection""" + collection = api_tester._parse_openapi_spec(sample_openapi_spec) + collection_id = "test_collection" + api_tester.collections[collection_id] = collection + + test_data = api_tester.generate_test_data_for_collection(collection_id) + + # Should generate data for requests with test_data_schema + post_request = next(r for r in collection.requests if r.method == "POST") + assert post_request.id in test_data + + @pytest.mark.asyncio + async def test_execute_request(self, api_tester): + """Test single request execution""" + request = APITestRequest( + id="test_request", + name="Test Request", + method="GET", + url="https://httpbin.org/get", + expected_status=200 + ) + + collection = APICollection( + name="Test Collection", + description="Test", + base_url="https://httpbin.org" + ) + + # Mock the HTTP request + with patch('aiohttp.ClientSession.request') as mock_request: + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={"success": True}) + mock_request.return_value.__aenter__.return_value = mock_response + + result = await api_tester._execute_request(request, collection, None) + + assert result.success is True + assert result.status_code == 200 + assert result.request_id == "test_request" + + def test_variable_substitution(self, api_tester): + """Test variable substitution in URLs and bodies""" + variables = {"baseUrl": "https://api.test.com", "userId": "123"} + + url = "{{baseUrl}}/users/{{userId}}" + result = api_tester._substitute_variables(url, variables) + assert result == "https://api.test.com/users/123" + + def test_generate_html_report(self, api_tester): + """Test HTML report generation""" + # Add some test results + api_tester.test_results = [ + APITestResult( + request_id="1", + name="Test 1", + success=True, + status_code=200, + response_time=1.5 + ), + APITestResult( + request_id="2", + name="Test 2", + success=False, + status_code=500, + response_time=2.0, + error_message="Server error" + ) + ] + + report = api_tester._generate_html_report() + + assert "JsonAI API Test Report" in report + assert "Total: 2" in report + assert "Passed: 1" in report + assert "Failed: 1" in report + assert "Test 1 - PASS" in report + assert "Test 2 - FAIL" in report + + def test_generate_json_report(self, api_tester): + """Test JSON report generation""" + api_tester.test_results = [ + APITestResult( + request_id="1", + name="Test 1", + success=True, + status_code=200, + response_time=1.5 + ) + ] + + report = api_tester._generate_json_report() + report_data = json.loads(report) + + assert "summary" in report_data + assert "results" in report_data + assert report_data["summary"]["total_tests"] == 1 + assert report_data["summary"]["passed_tests"] == 1 + assert len(report_data["results"]) == 1 + + +class TestWorkflowScheduler: + """Test the workflow scheduler""" + + @pytest.fixture + def workflow_engine(self): + return WorkflowEngine(DummyBackend()) + + @pytest.fixture + def scheduler(self, workflow_engine): + return WorkflowScheduler(workflow_engine) + + def test_schedule_workflow(self, scheduler): + """Test workflow scheduling""" + schedule_config = { + "type": "daily", + "time": "09:00" + } + + schedule_id = scheduler.schedule_workflow("test_workflow", schedule_config) + + assert schedule_id is not None + assert schedule_id in scheduler.scheduled_workflows + + scheduled_info = scheduler.scheduled_workflows[schedule_id] + assert scheduled_info["workflow_name"] == "test_workflow" + assert scheduled_info["schedule"] == schedule_config + assert "next_run" in scheduled_info + + def test_calculate_next_run_daily(self, scheduler): + """Test daily schedule calculation""" + schedule_config = {"type": "daily", "time": "09:00"} + next_run = scheduler._calculate_next_run(schedule_config) + + assert next_run.hour == 9 + assert next_run.minute == 0 + + def test_calculate_next_run_interval(self, scheduler): + """Test interval schedule calculation""" + schedule_config = {"type": "interval", "interval_seconds": 3600} + next_run = scheduler._calculate_next_run(schedule_config) + + # Should be approximately 1 hour from now + from datetime import datetime, timedelta + expected = datetime.now() + timedelta(hours=1) + assert abs((next_run - expected).total_seconds()) < 60 # Within 1 minute + + +class TestIntegration: + """Integration tests for workflow and API testing""" + + @pytest.mark.asyncio + async def test_workflow_with_api_testing(self): + """Test workflow that includes API testing steps""" + workflow_data = { + "name": "integration_test_workflow", + "version": "1.0", + "steps": [ + { + "id": "generate_data", + "type": "json_generation", + "config": { + "schema": {"type": "object", "properties": {"name": {"type": "string"}}}, + "prompt": "Generate test data", + "output_variable": "test_data" + } + }, + { + "id": "api_test", + "type": "api_request", + "depends_on": ["generate_data"], + "config": { + "method": "POST", + "url": "https://httpbin.org/post", + "body": "${test_data}", + "expected_status": 200 + } + } + ] + } + + engine = WorkflowEngine(DummyBackend()) + workflow_name = engine.create_workflow(workflow_data) + + # Mock HTTP request + with patch('aiohttp.ClientSession.request') as mock_request: + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={"success": True}) + mock_request.return_value.__aenter__.return_value = mock_response + + execution_id = await engine.execute_workflow(workflow_name) + execution = engine.get_execution_status(execution_id) + + assert execution.status == WorkflowStatus.COMPLETED + assert len(execution.step_results) == 2 + assert all(result.status == WorkflowStatus.COMPLETED + for result in execution.step_results.values()) + + +if __name__ == "__main__": + pytest.main([__file__]) \ No newline at end of file