Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,204 @@ def is_auth_enabled() -> bool:
return is_password_auth_enabled() or are_passkeys_enabled()


# ── Trusted header authentication (issue #3351) ─────────────────────────────
# When a reverse proxy (Authelia, Authentik, nginx auth_request, etc.) handles
# authentication, the WebUI can trust headers sent by that proxy to identify
# the user and optionally bind them to a specific Hermes profile.
#
# Security-critical: the proxy IP MUST be allowlisted via
# HERMES_WEBUI_TRUSTED_AUTH_PROXY_IPS. Without this, any client can forge
# the trusted header and impersonate any user.
# ────────────────────────────────────────────────────────────────────────────

_TRUSTED_AUTH_HEADER: str | None = None
_TRUSTED_GROUPS_HEADER: str | None = None
_GROUP_PROFILE_MAP: dict[str, str] | None = None
_TRUSTED_PROXY_IPS: list | None = None
_TRUSTED_AUTH_LOGOUT_URL: str | None = None
_TRUSTED_AUTH_CACHE_LOCK = threading.Lock()
_TRUSTED_AUTH_CACHE_POPULATED = False


def _populate_trusted_auth_cache() -> None:
"""Parse env vars once and cache the result."""
global _TRUSTED_AUTH_CACHE_POPULATED
global _TRUSTED_AUTH_HEADER, _TRUSTED_GROUPS_HEADER
global _GROUP_PROFILE_MAP, _TRUSTED_PROXY_IPS, _TRUSTED_AUTH_LOGOUT_URL
with _TRUSTED_AUTH_CACHE_LOCK:
if _TRUSTED_AUTH_CACHE_POPULATED:
return
_TRUSTED_AUTH_HEADER = os.getenv('HERMES_WEBUI_TRUSTED_AUTH_HEADER', '').strip() or None
_TRUSTED_GROUPS_HEADER = os.getenv('HERMES_WEBUI_TRUSTED_GROUPS_HEADER', '').strip() or None
_TRUSTED_AUTH_LOGOUT_URL = os.getenv('HERMES_WEBUI_TRUSTED_AUTH_LOGOUT_URL', '').strip() or None

# Parse group→profile map: "hermes_devops=devops,hermes_coworkers=coworkers"
raw_map = os.getenv('HERMES_WEBUI_GROUP_PROFILE_MAP', '').strip()
if raw_map:
_GROUP_PROFILE_MAP = {}
for segment in raw_map.split(','):
segment = segment.strip()
if '=' in segment:
group, profile = segment.split('=', 1)
_GROUP_PROFILE_MAP[group.strip()] = profile.strip()
else:
_GROUP_PROFILE_MAP = None

# Parse proxy IP allowlist (CIDR or plain IPs)
raw_ips = os.getenv('HERMES_WEBUI_TRUSTED_AUTH_PROXY_IPS', '').strip()
if raw_ips:
_TRUSTED_PROXY_IPS = _parse_trusted_proxy_ips(raw_ips)
else:
# Default: loopback only — safe for local reverse-proxy setups
_TRUSTED_PROXY_IPS = _parse_trusted_proxy_ips('127.0.0.1,::1')

_TRUSTED_AUTH_CACHE_POPULATED = True


def _parse_trusted_proxy_ips(raw: str) -> list:
"""Parse a comma-separated list of IPs or CIDR ranges.

Supports IPv4, IPv6, and CIDR notation (e.g. 10.0.0.0/8, 192.168.1.0/24).
Returns a list of (network_address, prefix_len) tuples for fast matching.
"""
import ipaddress
results = []
for part in raw.split(','):
part = part.strip()
if not part:
continue
try:
network = ipaddress.ip_network(part, strict=False)
results.append((network.network_address, network.prefixlen))
except ValueError:
logger.warning("Ignoring invalid trusted proxy IP/CIDR: %r", part)
return results


def _is_trusted_proxy(ip_str: str) -> bool:
"""Return True if the given IP is in the trusted proxy allowlist."""
_populate_trusted_auth_cache()
if not _TRUSTED_PROXY_IPS:
return False
import ipaddress
try:
client_ip = ipaddress.ip_address(ip_str)
except ValueError:
return False
for network_addr, prefix_len in _TRUSTED_PROXY_IPS:
try:
network = ipaddress.ip_network(f"{network_addr}/{prefix_len}", strict=False)
if client_ip in network:
return True
except ValueError:
continue
return False


def _get_client_ip(handler) -> str:
"""Extract the client IP from the handler, preferring X-Forwarded-For."""
# In reverse-proxy setups, the immediate peer is the proxy itself.
# We use X-Forwarded-For to get the original client IP for logging,
# but for *trust decisions* we must verify the proxy's IP, not the
# X-Forwarded-For header (which is trivially forgeable).
forwarded = handler.headers.get('X-Forwarded-For', '')
if forwarded:
# First entry is the original client
return forwarded.split(',')[0].strip()
return handler.client_address[0]


def _get_proxy_ip(handler) -> str:
"""Return the immediate peer IP (the proxy, not the original client)."""
return handler.client_address[0]


def _strip_untrusted_headers(handler) -> None:
"""Remove trusted auth headers when the request does not come from a
trusted proxy. This prevents header-forgery attacks where a client
directly connects and sends Remote-User themselves."""
_populate_trusted_auth_cache()
if not _TRUSTED_AUTH_HEADER:
return
proxy_ip = _get_proxy_ip(handler)
if _is_trusted_proxy(proxy_ip):
return
# Strip the header to prevent forgery
if _TRUSTED_AUTH_HEADER in handler.headers:
del handler.headers[_TRUSTED_AUTH_HEADER]
if _TRUSTED_GROUPS_HEADER and _TRUSTED_GROUPS_HEADER in handler.headers:
del handler.headers[_TRUSTED_GROUPS_HEADER]


def get_trusted_user(handler) -> str | None:
"""Extract the trusted username from the request headers.

Only returns a value when:
1. HERMES_WEBUI_TRUSTED_AUTH_HEADER is configured
2. The request comes from a trusted proxy IP
3. The header is present and non-empty
"""
_populate_trusted_auth_cache()
if not _TRUSTED_AUTH_HEADER:
return None
proxy_ip = _get_proxy_ip(handler)
if not _is_trusted_proxy(proxy_ip):
return None
user = handler.headers.get(_TRUSTED_AUTH_HEADER, '').strip()
return user if user else None


def get_trusted_groups(handler) -> list[str]:
"""Extract trusted group names from the request headers.

Expects comma-separated group names in the configured groups header.
Only returns values when the request comes from a trusted proxy IP.
"""
_populate_trusted_auth_cache()
if not _TRUSTED_GROUPS_HEADER:
return []
proxy_ip = _get_proxy_ip(handler)
if not _is_trusted_proxy(proxy_ip):
return []
raw = handler.headers.get(_TRUSTED_GROUPS_HEADER, '')
if not raw:
return []
return [g.strip() for g in raw.split(',') if g.strip()]


def get_bound_profile_from_groups(handler) -> str | None:
"""Map trusted group headers to a Hermes profile name.

Returns the first matching profile from the group→profile map, or None
if no mapping matches.
"""
_populate_trusted_auth_cache()
if not _GROUP_PROFILE_MAP:
return None
groups = get_trusted_groups(handler)
for group in groups:
if group in _GROUP_PROFILE_MAP:
return _GROUP_PROFILE_MAP[group]
return None


def is_trusted_auth_enabled() -> bool:
"""True if trusted header authentication is configured."""
_populate_trusted_auth_cache()
return _TRUSTED_AUTH_HEADER is not None


def trusted_auth_logout_url() -> str | None:
"""Return the configured logout URL for trusted auth (e.g. Authelia), or None."""
_populate_trusted_auth_cache()
return _TRUSTED_AUTH_LOGOUT_URL


def create_trusted_session(handler) -> str:
"""Create a session for a trusted-auth user and return the cookie value."""
return create_session()


def verify_password(plain: str) -> bool:
"""Verify a plaintext password against the stored hash.

Expand Down Expand Up @@ -489,6 +687,23 @@ def parse_cookie(handler) -> str | None:
def check_auth(handler, parsed) -> bool:
"""Check if request is authorized. Returns True if OK.
If not authorized, sends 401 (API) or 302 redirect (page) and returns False."""
# Strip forged headers before any auth decision
_strip_untrusted_headers(handler)

# Trusted header auth: if configured and valid, auto-create session
if is_trusted_auth_enabled():
trusted_user = get_trusted_user(handler)
if trusted_user:
# Valid trusted auth — ensure session exists
cookie_val = parse_cookie(handler)
if cookie_val and verify_session(cookie_val):
return True
# Auto-create session for trusted user
new_cookie = create_trusted_session(handler)
# Store cookie in handler for response (server.py will set it)
handler._trusted_auth_cookie = new_cookie
return True

if not is_auth_enabled():
return True
# Public paths don't require auth
Expand Down
20 changes: 19 additions & 1 deletion api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4288,14 +4288,22 @@ def handle_get(handler, parsed) -> bool:
return t(handler, _page, content_type="text/html; charset=utf-8")

if parsed.path == "/api/auth/status":
from api.auth import _passkey_feature_flag_enabled, get_password_hash, is_auth_enabled, parse_cookie, verify_session
from api.auth import _passkey_feature_flag_enabled, get_password_hash, is_auth_enabled, parse_cookie, verify_session, is_trusted_auth_enabled, trusted_auth_logout_url
from api.passkeys import registered_credentials

logged_in = False
auth_enabled = is_auth_enabled()
if auth_enabled:
cv = parse_cookie(handler)
logged_in = bool(cv and verify_session(cv))
# Trusted auth is also "auth enabled" for UI purposes
trusted_auth = is_trusted_auth_enabled()
if trusted_auth:
auth_enabled = True
# If trusted auth header is present, user is considered logged in
from api.auth import get_trusted_user
if get_trusted_user(handler):
logged_in = True
passkey_flag = _passkey_feature_flag_enabled()
passkeys = registered_credentials() if passkey_flag else []
password_auth_enabled = get_password_hash() is not None
Expand All @@ -4307,6 +4315,8 @@ def handle_get(handler, parsed) -> bool:
"passkeys_enabled": bool(passkeys),
"passkeys_count": len(passkeys),
"passkey_feature_flag": passkey_flag,
"trusted_auth_enabled": trusted_auth,
"trusted_auth_logout_url": trusted_auth_logout_url(),
})

if parsed.path in ("/manifest.json", "/manifest.webmanifest"):
Expand Down Expand Up @@ -7421,6 +7431,7 @@ def _llm_update_summary(system_prompt: str, user_prompt: str) -> str:

if parsed.path == "/api/auth/logout":
from api.auth import clear_auth_cookie, invalidate_session, parse_cookie
from api.helpers import get_profile_cookie_name

cookie_val = parse_cookie(handler)
if cookie_val:
Expand All @@ -7432,6 +7443,13 @@ def _llm_update_summary(system_prompt: str, user_prompt: str) -> str:
handler.send_header("Cache-Control", "no-store")
_security_headers(handler)
clear_auth_cookie(handler)
# Also clear the profile cookie so the next login starts fresh
import http.cookies as _hc
profile_cookie = _hc.SimpleCookie()
profile_cookie[get_profile_cookie_name()] = ''
profile_cookie[get_profile_cookie_name()]['path'] = '/'
profile_cookie[get_profile_cookie_name()]['max-age'] = '0'
handler.send_header('Set-Cookie', profile_cookie[get_profile_cookie_name()].OutputString())
handler.end_headers()
handler.wfile.write(body)
return True
Expand Down
25 changes: 25 additions & 0 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,29 @@ def _build_csp_report_only_policy() -> str:
from api.updates import WEBUI_VERSION


def _apply_trusted_auth_cookies(handler) -> None:
"""Set auth and profile cookies for trusted-header-authenticated requests.

Called after check_auth() succeeds. If the handler has a _trusted_auth_cookie
attribute (set by check_auth() when auto-creating a session), send it to the
client. Also set the bound profile cookie if group-based profile binding is
configured.
"""
# Set the auto-created session cookie
if getattr(handler, '_trusted_auth_cookie', None):
from api.auth import set_auth_cookie
set_auth_cookie(handler, handler._trusted_auth_cookie)
del handler._trusted_auth_cookie

# Set the bound profile cookie if trusted auth is active
from api.auth import is_trusted_auth_enabled, get_bound_profile_from_groups
from api.helpers import build_profile_cookie
if is_trusted_auth_enabled():
bound_profile = get_bound_profile_from_groups(handler)
if bound_profile:
handler.send_header('Set-Cookie', build_profile_cookie(bound_profile))


class QuietHTTPServer(ThreadingHTTPServer):
"""Custom HTTP server that silently handles common network errors."""
daemon_threads = True
Expand Down Expand Up @@ -308,6 +331,7 @@ def do_GET(self) -> None:
try:
parsed = urlparse(self.path)
if not check_auth(self, parsed): return
_apply_trusted_auth_cookies(self)
result = handle_get(self, parsed)
if result is False:
return j(self, {'error': 'not found'}, status=404)
Expand Down Expand Up @@ -347,6 +371,7 @@ def _handle_write(self, route_func) -> None:
parsed.path == "/api/csp-report" and self.command == "POST"
)
if not _is_csp_report_post and not check_auth(self, parsed): return
_apply_trusted_auth_cookies(self)
result = route_func(self, parsed)
if result is False:
return j(self, {'error': 'not found'}, status=404)
Expand Down
12 changes: 12 additions & 0 deletions static/login.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ document.addEventListener('DOMContentLoaded', function () {

if (!form || !input) return;

// Trusted header auth: if the server says trusted auth is enabled and we're
// already authenticated (session cookie was auto-created), just redirect.
fetch('api/auth/status', { credentials: 'include' })
.then(function (r) { return r.ok ? r.json() : null; })
.then(function (s) {
if (s && s.trusted_auth_enabled && s.logged_in) {
// Already authenticated via trusted header — redirect to app
window.location.href = _safeNextPath();
}
})
.catch(function () {});

var invalidPw = form.getAttribute('data-invalid-pw') || 'Invalid password';
var connFailed = form.getAttribute('data-conn-failed') || 'Connection failed';

Expand Down
9 changes: 8 additions & 1 deletion static/panels.js
Original file line number Diff line number Diff line change
Expand Up @@ -7672,8 +7672,15 @@ async function saveSettings(andClose){

async function signOut(){
try{
var statusRes = await fetch('/api/auth/status', { credentials: 'include' });
var status = statusRes.ok ? await statusRes.json() : {};
await api('/api/auth/logout',{method:'POST',body:'{}'});
window.location.href='login';
// If trusted auth with a logout URL (e.g. Authelia), redirect there
if (status.trusted_auth_enabled && status.trusted_auth_logout_url) {
window.location.href = status.trusted_auth_logout_url;
} else {
window.location.href='login';
}
}catch(e){
showToast(t('sign_out_failed')+e.message);
}
Expand Down