Security is a core principle of AWAS. This document outlines security considerations and best practices.
- Threat Model
- Authentication & Authorization
- Rate Limiting
- Input Validation
- CSRF Protection
- Privacy Considerations
- Audit Logging
- Security Headers
-
Abuse by Malicious AI Agents
- Automated scraping at scale
- Resource exhaustion attacks
- Data exfiltration
-
Action Injection
- Manipulating action definitions
- Bypassing authentication
- Privilege escalation
-
Privacy Violations
- Unauthorized data access
- User tracking
- Data training without consent
-
Traditional Web Attacks
- XSS, CSRF, SQL injection
- Session hijacking
- Man-in-the-middle attacks
Clearly specify authentication requirements in your manifest:
{
"actions": [
{
"id": "add_to_cart",
"authentication_required": true,
"authorization": {
"roles": ["authenticated_user"],
"scopes": ["cart:write"]
}
}
]
}Session-Based Auth:
from functools import wraps
from flask import session, request, jsonify
def require_auth(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
return jsonify({"error": "Authentication required"}), 401
return f(*args, **kwargs)
return decorated_function
@app.route('/api/cart/add', methods=['POST'])
@require_auth
def add_to_cart():
# User is authenticated
passAPI Key Auth:
def validate_api_key(f):
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key or not is_valid_key(api_key):
return jsonify({"error": "Invalid API key"}), 403
return f(*args, **kwargs)
return decorated_functionJWT Auth:
import jwt
def require_jwt(f):
@wraps(f)
def decorated_function(*args, **kwargs):
token = request.headers.get('Authorization', '').replace('Bearer ', '')
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
request.user = payload
except jwt.InvalidTokenError:
return jsonify({"error": "Invalid token"}), 401
return f(*args, **kwargs)
return decorated_functionFor third-party AI agents:
from authlib.integrations.flask_oauth2 import ResourceProtector
from authlib.oauth2.rfc6750 import BearerTokenValidator
require_oauth = ResourceProtector()
@app.route('/api/protected', methods=['POST'])
@require_oauth('scope:action')
def protected_action():
# OAuth-protected action
passImplement different limits for different user types:
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app,
key_func=get_remote_address,
storage_uri="redis://localhost:6379"
)
def get_rate_limit():
"""Dynamic rate limiting based on agent type"""
if is_ai_agent(request):
agent_name = request.headers.get('X-AI-Agent-Name', '')
if agent_name in TRUSTED_AGENTS:
return "120 per minute" # Trusted agents
return "60 per minute" # Standard AI agents
return "300 per minute" # Human users
@app.route('/api/search')
@limiter.limit(get_rate_limit)
def search():
pass{
"actions": [
{
"id": "search_products",
"rate_limit": {
"requests": 60,
"window": "minute"
}
},
{
"id": "create_order",
"rate_limit": {
"requests": 10,
"window": "hour"
}
}
]
}from redis import Redis
import time
redis_client = Redis()
def check_burst_limit(client_id, burst_limit=10, window=60):
"""Check if client exceeded burst limit"""
key = f"burst:{client_id}"
current = redis_client.incr(key)
if current == 1:
redis_client.expire(key, window)
if current > burst_limit:
return False
return TrueNEVER trust client-side validation alone:
from marshmallow import Schema, fields, validate, ValidationError
class AddToCartSchema(Schema):
product_id = fields.Str(required=True, validate=validate.Length(min=1, max=50))
quantity = fields.Int(required=True, validate=validate.Range(min=1, max=100))
variant_id = fields.Str(validate=validate.Length(max=50))
@app.route('/api/cart/add', methods=['POST'])
def add_to_cart():
schema = AddToCartSchema()
try:
data = schema.load(request.json)
except ValidationError as err:
return jsonify({"errors": err.messages}), 400
# Process valid data
passUse parameterized queries:
# NEVER do this
cursor.execute(f"SELECT * FROM products WHERE id = '{product_id}'")
# Always do this
cursor.execute("SELECT * FROM products WHERE id = ?", (product_id,))Sanitize outputs:
from markupsafe import escape
@app.route('/api/search')
def search():
query = escape(request.args.get('q', ''))
# Use sanitized query
passfrom flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect(app)
# Exempt AI agent endpoints if using other auth
@app.route('/api/action', methods=['POST'])
@csrf.exempt # Only if using API key/OAuth
def action():
if is_ai_agent(request):
# Validate API key instead
pass
else:
# CSRF token validated automatically
passimport secrets
def generate_csrf_token():
if 'csrf_token' not in session:
session['csrf_token'] = secrets.token_hex(32)
return session['csrf_token']
def validate_csrf_token():
token = request.headers.get('X-CSRF-Token') or request.form.get('csrf_token')
if not token or token != session.get('csrf_token'):
abort(403)Only expose necessary data:
{
"actions": [
{
"id": "search_users",
"outputs": {
"fields": ["user_id", "username"],
"excluded": ["email", "password_hash", "ip_address"]
}
}
]
}Respect user privacy preferences:
@app.route('/api/action', methods=['POST'])
def action():
if request.headers.get('X-AI-No-Training') == 'true':
# Don't log detailed data for AI training
log_minimal(request)
else:
log_full(request)import re
def mask_pii(data):
"""Mask personally identifiable information"""
# Mask email
if 'email' in data:
data['email'] = re.sub(r'(.{2}).*(@.*)', r'\1***\2', data['email'])
# Mask phone
if 'phone' in data:
data['phone'] = re.sub(r'(\d{3})\d{4}(\d{3})', r'\1****\2', data['phone'])
return data@app.route('/api/user/data', methods=['GET'])
@require_auth
def get_user_data():
"""Right to data portability"""
user_id = session['user_id']
data = get_all_user_data(user_id)
return jsonify(data)
@app.route('/api/user/delete', methods=['DELETE'])
@require_auth
def delete_user():
"""Right to be forgotten"""
user_id = session['user_id']
delete_all_user_data(user_id)
return jsonify({"success": True})import logging
from datetime import datetime
def log_ai_action(action_id, user_id, params, result):
"""Log AI agent actions for audit trail"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'action_id': action_id,
'user_id': user_id,
'ai_agent': request.headers.get('X-AI-Agent-Name', 'Unknown'),
'ip_address': request.remote_addr,
'params': params,
'result': result,
'status_code': result.get('status_code', 200)
}
logging.info(f"AI_ACTION: {json.dumps(log_entry)}")
# Store in database for compliance
store_audit_log(log_entry)def log_security_event(event_type, details):
"""Log security-relevant events"""
security_log.warning({
'event_type': event_type,
'details': details,
'timestamp': datetime.utcnow().isoformat(),
'ip': request.remote_addr,
'user_agent': request.headers.get('User-Agent')
})
# Usage
@app.route('/api/login', methods=['POST'])
def login():
if failed_auth:
log_security_event('failed_login', {
'username': username,
'reason': 'invalid_password'
})@app.after_request
def add_security_headers(response):
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['Content-Security-Policy'] = "default-src 'self'"
response.headers['Referrer-Policy'] = 'no-referrer'
return responsefrom flask_cors import CORS
# Restrictive CORS for AI agents
CORS(app, resources={
r"/.well-known/*": {
"origins": "*",
"methods": ["GET"]
},
r"/api/*": {
"origins": ALLOWED_ORIGINS,
"methods": ["GET", "POST"],
"allow_headers": ["Content-Type", "X-AI-Agent", "Authorization"]
}
}){
"permissions": {
"guest": {
"allowed_actions": ["search_products", "view_product"],
"denied_actions": ["add_to_cart", "checkout"]
},
"authenticated": {
"allowed_actions": ["*"],
"denied_actions": ["admin_actions"]
},
"ai_agent": {
"allowed_actions": ["search_products", "view_product", "compare"],
"denied_actions": ["checkout", "payment", "account_modify"],
"requires_user_consent": ["add_to_cart", "create_order"]
}
}
}def check_permission(action_id, user_role):
"""Check if user has permission for action"""
permissions = load_permissions()
role_perms = permissions.get(user_role, {})
# Check denied first
if action_id in role_perms.get('denied_actions', []):
return False
# Check allowed
allowed = role_perms.get('allowed_actions', [])
if '*' in allowed or action_id in allowed:
return True
return False
@app.route('/api/action/<action_id>', methods=['POST'])
def execute_action(action_id):
user_role = get_user_role()
if not check_permission(action_id, user_role):
return jsonify({"error": "Forbidden"}), 403
# Execute action
passSENSITIVE_ACTIONS = ['checkout', 'payment', 'account_delete', 'data_export']
@app.route('/api/action/<action_id>', methods=['POST'])
def execute_action(action_id):
if action_id in SENSITIVE_ACTIONS:
if is_ai_agent(request):
return jsonify({
"error": "Human verification required",
"verification_url": f"/verify/{action_id}",
"message": "This action requires human confirmation"
}), 403
# Execute action
passBefore deploying AWAS:
- Rate limiting implemented and tested
- Authentication required for sensitive actions
- Input validation on all endpoints
- CSRF protection enabled
- SQL injection prevention verified
- XSS prevention implemented
- Security headers configured
- CORS properly restricted
- Audit logging in place
- PII protection mechanisms active
- GDPR compliance verified
- Sensitive actions require human verification
- Regular security testing scheduled
- Incident response plan documented
If you discover a security vulnerability in AWAS:
- DO NOT create a public GitHub issue
- Email security@awas.dev with details
- Include steps to reproduce
- Allow time for patching before disclosure
Stay informed about security updates:
- Watch the GitHub repository
- Subscribe to security announcements
- Review changelog for security patches
Last Updated: October 29, 2025