diff --git a/api/auth.py b/api/auth.py index 6bef45e877..c78252e256 100644 --- a/api/auth.py +++ b/api/auth.py @@ -355,6 +355,204 @@ def is_auth_enabled() -> bool: return is_password_auth_enabled() or are_passkeys_enabled() +# ── Trusted header authentication (issue #3351) ───────────────────────────── +# When a reverse proxy (Authelia, Authentik, nginx auth_request, etc.) handles +# authentication, the WebUI can trust headers sent by that proxy to identify +# the user and optionally bind them to a specific Hermes profile. +# +# Security-critical: the proxy IP MUST be allowlisted via +# HERMES_WEBUI_TRUSTED_AUTH_PROXY_IPS. Without this, any client can forge +# the trusted header and impersonate any user. +# ──────────────────────────────────────────────────────────────────────────── + +_TRUSTED_AUTH_HEADER: str | None = None +_TRUSTED_GROUPS_HEADER: str | None = None +_GROUP_PROFILE_MAP: dict[str, str] | None = None +_TRUSTED_PROXY_IPS: list | None = None +_TRUSTED_AUTH_LOGOUT_URL: str | None = None +_TRUSTED_AUTH_CACHE_LOCK = threading.Lock() +_TRUSTED_AUTH_CACHE_POPULATED = False + + +def _populate_trusted_auth_cache() -> None: + """Parse env vars once and cache the result.""" + global _TRUSTED_AUTH_CACHE_POPULATED + global _TRUSTED_AUTH_HEADER, _TRUSTED_GROUPS_HEADER + global _GROUP_PROFILE_MAP, _TRUSTED_PROXY_IPS, _TRUSTED_AUTH_LOGOUT_URL + with _TRUSTED_AUTH_CACHE_LOCK: + if _TRUSTED_AUTH_CACHE_POPULATED: + return + _TRUSTED_AUTH_HEADER = os.getenv('HERMES_WEBUI_TRUSTED_AUTH_HEADER', '').strip() or None + _TRUSTED_GROUPS_HEADER = os.getenv('HERMES_WEBUI_TRUSTED_GROUPS_HEADER', '').strip() or None + _TRUSTED_AUTH_LOGOUT_URL = os.getenv('HERMES_WEBUI_TRUSTED_AUTH_LOGOUT_URL', '').strip() or None + + # Parse group→profile map: "hermes_devops=devops,hermes_coworkers=coworkers" + raw_map = os.getenv('HERMES_WEBUI_GROUP_PROFILE_MAP', '').strip() + if raw_map: + _GROUP_PROFILE_MAP = {} + for segment in raw_map.split(','): + segment = segment.strip() + if '=' in segment: + group, profile = segment.split('=', 1) + _GROUP_PROFILE_MAP[group.strip()] = profile.strip() + else: + _GROUP_PROFILE_MAP = None + + # Parse proxy IP allowlist (CIDR or plain IPs) + raw_ips = os.getenv('HERMES_WEBUI_TRUSTED_AUTH_PROXY_IPS', '').strip() + if raw_ips: + _TRUSTED_PROXY_IPS = _parse_trusted_proxy_ips(raw_ips) + else: + # Default: loopback only — safe for local reverse-proxy setups + _TRUSTED_PROXY_IPS = _parse_trusted_proxy_ips('127.0.0.1,::1') + + _TRUSTED_AUTH_CACHE_POPULATED = True + + +def _parse_trusted_proxy_ips(raw: str) -> list: + """Parse a comma-separated list of IPs or CIDR ranges. + + Supports IPv4, IPv6, and CIDR notation (e.g. 10.0.0.0/8, 192.168.1.0/24). + Returns a list of (network_address, prefix_len) tuples for fast matching. + """ + import ipaddress + results = [] + for part in raw.split(','): + part = part.strip() + if not part: + continue + try: + network = ipaddress.ip_network(part, strict=False) + results.append((network.network_address, network.prefixlen)) + except ValueError: + logger.warning("Ignoring invalid trusted proxy IP/CIDR: %r", part) + return results + + +def _is_trusted_proxy(ip_str: str) -> bool: + """Return True if the given IP is in the trusted proxy allowlist.""" + _populate_trusted_auth_cache() + if not _TRUSTED_PROXY_IPS: + return False + import ipaddress + try: + client_ip = ipaddress.ip_address(ip_str) + except ValueError: + return False + for network_addr, prefix_len in _TRUSTED_PROXY_IPS: + try: + network = ipaddress.ip_network(f"{network_addr}/{prefix_len}", strict=False) + if client_ip in network: + return True + except ValueError: + continue + return False + + +def _get_client_ip(handler) -> str: + """Extract the client IP from the handler, preferring X-Forwarded-For.""" + # In reverse-proxy setups, the immediate peer is the proxy itself. + # We use X-Forwarded-For to get the original client IP for logging, + # but for *trust decisions* we must verify the proxy's IP, not the + # X-Forwarded-For header (which is trivially forgeable). + forwarded = handler.headers.get('X-Forwarded-For', '') + if forwarded: + # First entry is the original client + return forwarded.split(',')[0].strip() + return handler.client_address[0] + + +def _get_proxy_ip(handler) -> str: + """Return the immediate peer IP (the proxy, not the original client).""" + return handler.client_address[0] + + +def _strip_untrusted_headers(handler) -> None: + """Remove trusted auth headers when the request does not come from a + trusted proxy. This prevents header-forgery attacks where a client + directly connects and sends Remote-User themselves.""" + _populate_trusted_auth_cache() + if not _TRUSTED_AUTH_HEADER: + return + proxy_ip = _get_proxy_ip(handler) + if _is_trusted_proxy(proxy_ip): + return + # Strip the header to prevent forgery + if _TRUSTED_AUTH_HEADER in handler.headers: + del handler.headers[_TRUSTED_AUTH_HEADER] + if _TRUSTED_GROUPS_HEADER and _TRUSTED_GROUPS_HEADER in handler.headers: + del handler.headers[_TRUSTED_GROUPS_HEADER] + + +def get_trusted_user(handler) -> str | None: + """Extract the trusted username from the request headers. + + Only returns a value when: + 1. HERMES_WEBUI_TRUSTED_AUTH_HEADER is configured + 2. The request comes from a trusted proxy IP + 3. The header is present and non-empty + """ + _populate_trusted_auth_cache() + if not _TRUSTED_AUTH_HEADER: + return None + proxy_ip = _get_proxy_ip(handler) + if not _is_trusted_proxy(proxy_ip): + return None + user = handler.headers.get(_TRUSTED_AUTH_HEADER, '').strip() + return user if user else None + + +def get_trusted_groups(handler) -> list[str]: + """Extract trusted group names from the request headers. + + Expects comma-separated group names in the configured groups header. + Only returns values when the request comes from a trusted proxy IP. + """ + _populate_trusted_auth_cache() + if not _TRUSTED_GROUPS_HEADER: + return [] + proxy_ip = _get_proxy_ip(handler) + if not _is_trusted_proxy(proxy_ip): + return [] + raw = handler.headers.get(_TRUSTED_GROUPS_HEADER, '') + if not raw: + return [] + return [g.strip() for g in raw.split(',') if g.strip()] + + +def get_bound_profile_from_groups(handler) -> str | None: + """Map trusted group headers to a Hermes profile name. + + Returns the first matching profile from the group→profile map, or None + if no mapping matches. + """ + _populate_trusted_auth_cache() + if not _GROUP_PROFILE_MAP: + return None + groups = get_trusted_groups(handler) + for group in groups: + if group in _GROUP_PROFILE_MAP: + return _GROUP_PROFILE_MAP[group] + return None + + +def is_trusted_auth_enabled() -> bool: + """True if trusted header authentication is configured.""" + _populate_trusted_auth_cache() + return _TRUSTED_AUTH_HEADER is not None + + +def trusted_auth_logout_url() -> str | None: + """Return the configured logout URL for trusted auth (e.g. Authelia), or None.""" + _populate_trusted_auth_cache() + return _TRUSTED_AUTH_LOGOUT_URL + + +def create_trusted_session(handler) -> str: + """Create a session for a trusted-auth user and return the cookie value.""" + return create_session() + + def verify_password(plain: str) -> bool: """Verify a plaintext password against the stored hash. @@ -489,6 +687,23 @@ def parse_cookie(handler) -> str | None: def check_auth(handler, parsed) -> bool: """Check if request is authorized. Returns True if OK. If not authorized, sends 401 (API) or 302 redirect (page) and returns False.""" + # Strip forged headers before any auth decision + _strip_untrusted_headers(handler) + + # Trusted header auth: if configured and valid, auto-create session + if is_trusted_auth_enabled(): + trusted_user = get_trusted_user(handler) + if trusted_user: + # Valid trusted auth — ensure session exists + cookie_val = parse_cookie(handler) + if cookie_val and verify_session(cookie_val): + return True + # Auto-create session for trusted user + new_cookie = create_trusted_session(handler) + # Store cookie in handler for response (server.py will set it) + handler._trusted_auth_cookie = new_cookie + return True + if not is_auth_enabled(): return True # Public paths don't require auth diff --git a/api/routes.py b/api/routes.py index 0d8a1d0be0..04432ed400 100644 --- a/api/routes.py +++ b/api/routes.py @@ -4288,7 +4288,7 @@ def handle_get(handler, parsed) -> bool: return t(handler, _page, content_type="text/html; charset=utf-8") if parsed.path == "/api/auth/status": - from api.auth import _passkey_feature_flag_enabled, get_password_hash, is_auth_enabled, parse_cookie, verify_session + from api.auth import _passkey_feature_flag_enabled, get_password_hash, is_auth_enabled, parse_cookie, verify_session, is_trusted_auth_enabled, trusted_auth_logout_url from api.passkeys import registered_credentials logged_in = False @@ -4296,6 +4296,14 @@ def handle_get(handler, parsed) -> bool: if auth_enabled: cv = parse_cookie(handler) logged_in = bool(cv and verify_session(cv)) + # Trusted auth is also "auth enabled" for UI purposes + trusted_auth = is_trusted_auth_enabled() + if trusted_auth: + auth_enabled = True + # If trusted auth header is present, user is considered logged in + from api.auth import get_trusted_user + if get_trusted_user(handler): + logged_in = True passkey_flag = _passkey_feature_flag_enabled() passkeys = registered_credentials() if passkey_flag else [] password_auth_enabled = get_password_hash() is not None @@ -4307,6 +4315,8 @@ def handle_get(handler, parsed) -> bool: "passkeys_enabled": bool(passkeys), "passkeys_count": len(passkeys), "passkey_feature_flag": passkey_flag, + "trusted_auth_enabled": trusted_auth, + "trusted_auth_logout_url": trusted_auth_logout_url(), }) if parsed.path in ("/manifest.json", "/manifest.webmanifest"): @@ -7421,6 +7431,7 @@ def _llm_update_summary(system_prompt: str, user_prompt: str) -> str: if parsed.path == "/api/auth/logout": from api.auth import clear_auth_cookie, invalidate_session, parse_cookie + from api.helpers import get_profile_cookie_name cookie_val = parse_cookie(handler) if cookie_val: @@ -7432,6 +7443,13 @@ def _llm_update_summary(system_prompt: str, user_prompt: str) -> str: handler.send_header("Cache-Control", "no-store") _security_headers(handler) clear_auth_cookie(handler) + # Also clear the profile cookie so the next login starts fresh + import http.cookies as _hc + profile_cookie = _hc.SimpleCookie() + profile_cookie[get_profile_cookie_name()] = '' + profile_cookie[get_profile_cookie_name()]['path'] = '/' + profile_cookie[get_profile_cookie_name()]['max-age'] = '0' + handler.send_header('Set-Cookie', profile_cookie[get_profile_cookie_name()].OutputString()) handler.end_headers() handler.wfile.write(body) return True diff --git a/server.py b/server.py index 39ba81c3a1..7c29c70e31 100644 --- a/server.py +++ b/server.py @@ -171,6 +171,29 @@ def _build_csp_report_only_policy() -> str: from api.updates import WEBUI_VERSION +def _apply_trusted_auth_cookies(handler) -> None: + """Set auth and profile cookies for trusted-header-authenticated requests. + + Called after check_auth() succeeds. If the handler has a _trusted_auth_cookie + attribute (set by check_auth() when auto-creating a session), send it to the + client. Also set the bound profile cookie if group-based profile binding is + configured. + """ + # Set the auto-created session cookie + if getattr(handler, '_trusted_auth_cookie', None): + from api.auth import set_auth_cookie + set_auth_cookie(handler, handler._trusted_auth_cookie) + del handler._trusted_auth_cookie + + # Set the bound profile cookie if trusted auth is active + from api.auth import is_trusted_auth_enabled, get_bound_profile_from_groups + from api.helpers import build_profile_cookie + if is_trusted_auth_enabled(): + bound_profile = get_bound_profile_from_groups(handler) + if bound_profile: + handler.send_header('Set-Cookie', build_profile_cookie(bound_profile)) + + class QuietHTTPServer(ThreadingHTTPServer): """Custom HTTP server that silently handles common network errors.""" daemon_threads = True @@ -308,6 +331,7 @@ def do_GET(self) -> None: try: parsed = urlparse(self.path) if not check_auth(self, parsed): return + _apply_trusted_auth_cookies(self) result = handle_get(self, parsed) if result is False: return j(self, {'error': 'not found'}, status=404) @@ -347,6 +371,7 @@ def _handle_write(self, route_func) -> None: parsed.path == "/api/csp-report" and self.command == "POST" ) if not _is_csp_report_post and not check_auth(self, parsed): return + _apply_trusted_auth_cookies(self) result = route_func(self, parsed) if result is False: return j(self, {'error': 'not found'}, status=404) diff --git a/static/login.js b/static/login.js index 9a7cdbe272..e65834d5ed 100644 --- a/static/login.js +++ b/static/login.js @@ -9,6 +9,18 @@ document.addEventListener('DOMContentLoaded', function () { if (!form || !input) return; + // Trusted header auth: if the server says trusted auth is enabled and we're + // already authenticated (session cookie was auto-created), just redirect. + fetch('api/auth/status', { credentials: 'include' }) + .then(function (r) { return r.ok ? r.json() : null; }) + .then(function (s) { + if (s && s.trusted_auth_enabled && s.logged_in) { + // Already authenticated via trusted header — redirect to app + window.location.href = _safeNextPath(); + } + }) + .catch(function () {}); + var invalidPw = form.getAttribute('data-invalid-pw') || 'Invalid password'; var connFailed = form.getAttribute('data-conn-failed') || 'Connection failed'; diff --git a/static/panels.js b/static/panels.js index a244470191..8659c53264 100644 --- a/static/panels.js +++ b/static/panels.js @@ -7672,8 +7672,15 @@ async function saveSettings(andClose){ async function signOut(){ try{ + var statusRes = await fetch('/api/auth/status', { credentials: 'include' }); + var status = statusRes.ok ? await statusRes.json() : {}; await api('/api/auth/logout',{method:'POST',body:'{}'}); - window.location.href='login'; + // If trusted auth with a logout URL (e.g. Authelia), redirect there + if (status.trusted_auth_enabled && status.trusted_auth_logout_url) { + window.location.href = status.trusted_auth_logout_url; + } else { + window.location.href='login'; + } }catch(e){ showToast(t('sign_out_failed')+e.message); }