Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions agents/nlp_pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""NLP Context Pipeline module for Sentinel-D DevSecOps project."""

__version__ = "1.0.0"
132 changes: 132 additions & 0 deletions agents/nlp_pipeline/fetchers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""Asynchronous fetchers for external APIs."""

import asyncio
import logging
import json
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
import aiohttp
import hashlib

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


class NVDFetcher:
"""Fetches security data from NVD 2.0 API with local response caching."""

BASE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
CACHE_DURATION = timedelta(hours=24)

def __init__(self, api_key: Optional[str] = None):
"""
Initialize NVDFetcher.

Args:
api_key: Optional NVD API key for higher rate limits.
"""
self.api_key = api_key
self.cache: Dict[str, tuple[Dict[str, Any], datetime]] = {}

async def fetch(self, cve_id: str) -> Dict[str, Any]:
"""
Fetch CVE data from NVD API with caching.

Args:
cve_id: CVE identifier (e.g., CVE-2024-1234).

Returns:
Dictionary containing CVE details or empty dict on failure.
"""
cache_key = hashlib.md5(cve_id.encode()).hexdigest()

# Check cache
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if datetime.utcnow() - timestamp < self.CACHE_DURATION:
logger.debug(f"Cache hit for CVE {cve_id}")
return cached_data

try:
async with aiohttp.ClientSession() as session:
params = {"cveId": cve_id}
if self.api_key:
params["apiKey"] = self.api_key

async with session.get(
self.BASE_URL,
params=params,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
data = await response.json()
# Cache the response
self.cache[cache_key] = (data, datetime.utcnow())
logger.info(f"Successfully fetched NVD data for {cve_id}")
return data
else:
logger.warning(f"NVD API error {response.status} for {cve_id}")
return {}
except asyncio.TimeoutError:
logger.error(f"Timeout fetching NVD data for {cve_id}")
return {}
except aiohttp.ClientError as e:
logger.error(f"Client error fetching NVD data for {cve_id}: {e}")
return {}
except Exception as e:
logger.error(f"Unexpected error fetching NVD data for {cve_id}: {e}")
return {}


class StackOverflowFetcher:
"""Fetches context from Stack Exchange API v2.3."""

BASE_URL = "https://api.stackexchange.com/2.3/search/advanced"
SITE = "stackoverflow"

def __init__(self):
"""Initialize StackOverflowFetcher."""
pass

async def fetch(self, affected_package: str, limit: int = 5) -> Dict[str, Any]:
"""
Fetch Stack Overflow answers for a package with filtering by score.

Args:
affected_package: Package name to search for.
limit: Maximum number of top-scored answers to return.

Returns:
Dictionary containing Stack Overflow search results or empty dict on failure.
"""
try:
async with aiohttp.ClientSession() as session:
params = {
"q": affected_package,
"sort": "votes",
"order": "desc",
"site": self.SITE,
"pagesize": limit,
}

async with session.get(
self.BASE_URL,
params=params,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
data = await response.json()
logger.info(f"Successfully fetched Stack Overflow data for {affected_package}")
return data
else:
logger.warning(f"Stack Exchange API error {response.status} for {affected_package}")
return {}
except asyncio.TimeoutError:
logger.error(f"Timeout fetching Stack Overflow data for {affected_package}")
return {}
except aiohttp.ClientError as e:
logger.error(f"Client error fetching Stack Overflow data for {affected_package}: {e}")
return {}
except Exception as e:
logger.error(f"Unexpected error fetching Stack Overflow data for {affected_package}: {e}")
return {}
150 changes: 150 additions & 0 deletions agents/nlp_pipeline/ml_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""ML model inference wrappers for NLP Context Pipeline."""

import logging
from typing import Dict, Any, List, Tuple

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


class EntityExtractor:
"""
Mock wrapper for fine-tuned spaCy NER model.

Extracts breaking changes and migration steps from raw text.
"""

def __init__(self, model_name: str = "spacy-ner-model-v1"):
"""
Initialize EntityExtractor.

Args:
model_name: Name of the NER model to use.
"""
self.model_name = model_name
logger.info(f"Initialized EntityExtractor with model: {model_name}")

def extract(self, text: str) -> Tuple[List[Dict[str, Any]], List[str]]:
"""
Extract breaking changes and migration steps from text.

Args:
text: Raw text from NVD or other sources.

Returns:
Tuple of (breaking_changes list, migration_steps list).
"""
logger.debug(f"Extracting entities from text of length {len(text)}")

# Mock implementation returning structured breaking changes
breaking_changes = [
{
"entity": "API_CHANGE",
"description": "Endpoint /api/v2/deprecated removed",
"severity": "HIGH",
"affected_functions": ["getUser()", "listUsers()"],
"remediation": "Use /api/v3/users instead"
},
{
"entity": "DEPENDENCY_REMOVAL",
"description": "Legacy XML parsing library removed",
"severity": "MEDIUM",
"affected_functions": ["parseXML()", "validateSchema()"],
"remediation": "Migrate to modern JSON-based APIs"
}
]

# Mock implementation returning migration steps
migration_steps = [
"Step 1: Review deprecated API endpoints in v2",
"Step 2: Update all API calls to v3 endpoints",
"Step 3: Test with new JSON serialization format",
"Step 4: Update error handling for new exception types",
"Step 5: Perform integration testing in staging environment",
"Step 6: Deploy with feature flag for gradual rollout"
]

logger.info(f"Extracted {len(breaking_changes)} breaking changes and {len(migration_steps)} migration steps")
return breaking_changes, migration_steps


class IntentClassifier:
"""
Mock wrapper for fine-tuned DistilBERT model.

Classifies community intent from Stack Overflow and other sources.
"""

INTENT_CLASSES = [
"API_MIGRATION",
"VERSION_PIN",
"SECURITY_FIX",
"PERFORMANCE_OPTIMIZATION",
"DEPENDENCY_UPDATE",
"BREAKING_CHANGE_MIGRATION",
"ROLLBACK_REQUIRED"
]

def __init__(self, model_name: str = "distilbert-intent-classifier-v1"):
"""
Initialize IntentClassifier.

Args:
model_name: Name of the DistilBERT model to use.
"""
self.model_name = model_name
logger.info(f"Initialized IntentClassifier with model: {model_name}")

def classify(
self, text: str
) -> Tuple[str, float]:
"""
Classify community intent from text.

Args:
text: Raw community text from Stack Overflow or other sources.

Returns:
Tuple of (intent_class, confidence_score) where confidence is 0.0-1.0.
"""
logger.debug(f"Classifying intent from text of length {len(text)}")

# Mock implementation: simulate classification with confidence
# In production, this would call the actual DistilBERT model
text_lower = text.lower()

intent_scores: Dict[str, float] = {
"API_MIGRATION": 0.0,
"VERSION_PIN": 0.0,
"SECURITY_FIX": 0.0,
"PERFORMANCE_OPTIMIZATION": 0.0,
"DEPENDENCY_UPDATE": 0.0,
"BREAKING_CHANGE_MIGRATION": 0.0,
"ROLLBACK_REQUIRED": 0.0,
}

# Simple keyword-based mock scoring
if any(keyword in text_lower for keyword in ["api", "endpoint", "rest", "graphql"]):
intent_scores["API_MIGRATION"] += 0.3
if any(keyword in text_lower for keyword in ["version", "pin", "lock", "freeze"]):
intent_scores["VERSION_PIN"] += 0.3
if any(keyword in text_lower for keyword in ["security", "vulnerability", "cve", "patch"]):
intent_scores["SECURITY_FIX"] += 0.4
if any(keyword in text_lower for keyword in ["performance", "speed", "optimization", "latency"]):
intent_scores["PERFORMANCE_OPTIMIZATION"] += 0.25
if any(keyword in text_lower for keyword in ["dependency", "require", "import", "package"]):
intent_scores["DEPENDENCY_UPDATE"] += 0.3
if any(keyword in text_lower for keyword in ["breaking", "incompatible", "change", "migration"]):
intent_scores["BREAKING_CHANGE_MIGRATION"] += 0.4
if any(keyword in text_lower for keyword in ["rollback", "revert", "downgrade", "issue"]):
intent_scores["ROLLBACK_REQUIRED"] += 0.35

# Normalize scores and select highest
max_intent = max(intent_scores, key=intent_scores.get)
max_score = intent_scores[max_intent]

# Ensure score is between 0 and 1, with fallback to mock value
confidence = min(max(max_score, 0.65), 0.95) if max_score > 0 else 0.72

logger.info(f"Classified intent as {max_intent} with confidence {confidence:.2f}")
return max_intent, confidence
Loading