From 906ac6758d9da8910f28bcaa3d94451db9866799 Mon Sep 17 00:00:00 2001 From: Michael Van Delft Date: Wed, 26 Feb 2025 18:56:58 +0800 Subject: [PATCH] When USE_JWT is set, log users in to the django admin console as well return the JWT (#361) * Move use_jwt block below login block * Update test to verify the user is logged in to both the Single Page App and the Django admin console * Fix linting --- django_saml2_auth/tests/test_saml.py | 76 +++++++++++++++++++++++++++- django_saml2_auth/views.py | 37 +++++++------- 2 files changed, 94 insertions(+), 19 deletions(-) diff --git a/django_saml2_auth/tests/test_saml.py b/django_saml2_auth/tests/test_saml.py index ce8f813..a81198e 100644 --- a/django_saml2_auth/tests/test_saml.py +++ b/django_saml2_auth/tests/test_saml.py @@ -9,10 +9,11 @@ from django.contrib.sessions.middleware import SessionMiddleware from unittest.mock import MagicMock from django.http import HttpRequest -from django.test.client import RequestFactory +from django.test.client import RequestFactory, Client from django.urls import NoReverseMatch from saml2 import BINDING_HTTP_POST +from django_saml2_auth.errors import INACTIVE_USER from django_saml2_auth.exceptions import SAMLAuthError from django_saml2_auth.saml import ( decode_saml_response, @@ -771,3 +772,76 @@ def test_get_metadata_success_with_custom_trigger(settings: SettingsWrapper): get_metadata(domain="not-mapped-example.com") assert str(exc_info.value) == "Domain not-mapped-example.com not mapped!" + + +@pytest.mark.django_db +@responses.activate +def test_acs_view_with_use_jwt_both_redirects_user_and_sets_cookies( + settings: SettingsWrapper, + monkeypatch: "MonkeyPatch", # type: ignore # noqa: F821 +): + """Test Acs view when USE_JWT is set, the user is redirected and cookies are set""" + responses.add(responses.GET, METADATA_URL1, body=METADATA1) + settings.SAML2_AUTH = { + "DEFAULT_NEXT_URL": "default_next_url", + "USE_JWT": True, + "JWT_SECRET": "JWT_SECRET", + "JWT_ALGORITHM": "HS256", + "FRONTEND_URL": "https://app.example.com/account/login/saml", + "TRIGGER": { + "BEFORE_LOGIN": None, + "AFTER_LOGIN": None, + "GET_METADATA_AUTO_CONF_URLS": GET_METADATA_AUTO_CONF_URLS, + }, + } + monkeypatch.setattr( + Saml2Client, "parse_authn_request_response", mock_parse_authn_request_response + ) + client = Client() + response = client.post("/acs/", {"SAMLResponse": "SAML RESPONSE", "RelayState": "/"}) + + # Response includes a redirect to the single page app, with the JWT in the query string. + assert response.status_code == 302 + assert "https://app.example.com/account/login/saml?token=eyJ" in getattr(response, "url") + # Response includes a session id cookie (i.e. the user is logged in to the django admin console) + assert response.cookies.get("sessionid") + + +@pytest.mark.django_db +@responses.activate +def test_acs_view_use_jwt_set_inactive_user( + settings: SettingsWrapper, + monkeypatch: "MonkeyPatch", # type: ignore # noqa: F821 +): + """Test Acs view when USE_JWT is set that inactive users can not log in""" + responses.add(responses.GET, METADATA_URL1, body=METADATA1) + settings.SAML2_AUTH = { + "DEFAULT_NEXT_URL": "default_next_url", + "USE_JWT": True, + "JWT_SECRET": "JWT_SECRET", + "JWT_ALGORITHM": "HS256", + "FRONTEND_URL": "https://app.example.com/account/login/saml", + "TRIGGER": { + "BEFORE_LOGIN": None, + "AFTER_LOGIN": None, + "GET_METADATA_AUTO_CONF_URLS": GET_METADATA_AUTO_CONF_URLS, + }, + } + post_request = RequestFactory().post(METADATA_URL1, {"SAMLResponse": "SAML RESPONSE"}) + monkeypatch.setattr( + Saml2Client, "parse_authn_request_response", mock_parse_authn_request_response + ) + created, mock_user = user.get_or_create_user( + {"username": "test@example.com", "first_name": "John", "last_name": "Doe"} + ) + mock_user.is_active = False + mock_user.save() + monkeypatch.setattr(user, "get_or_create_user", (created, mock_user)) + + middleware = SessionMiddleware(MagicMock()) + middleware.process_request(post_request) + post_request.session.save() + + result = acs(post_request) + assert result.status_code == 500 + assert f"Error code: {INACTIVE_USER}" in result.content.decode() diff --git a/django_saml2_auth/views.py b/django_saml2_auth/views.py index b1ad374..05f9462 100644 --- a/django_saml2_auth/views.py +++ b/django_saml2_auth/views.py @@ -149,24 +149,6 @@ def acs(request: HttpRequest): request.session.flush() - use_jwt = dictor(saml2_auth_settings, "USE_JWT", False) - if use_jwt and target_user.is_active: - # Create a new JWT token for IdP-initiated login (acs) - jwt_token = create_custom_or_default_jwt(target_user) - custom_token_query_trigger = dictor(saml2_auth_settings, "TRIGGER.CUSTOM_TOKEN_QUERY") - if custom_token_query_trigger: - query = run_hook(custom_token_query_trigger, jwt_token) - else: - query = f"?token={jwt_token}" - - # Use JWT auth to send token to frontend - frontend_url = dictor(saml2_auth_settings, "FRONTEND_URL", next_url) - custom_frontend_url_trigger = dictor(saml2_auth_settings, "TRIGGER.GET_CUSTOM_FRONTEND_URL") - if custom_frontend_url_trigger: - frontend_url = run_hook(custom_frontend_url_trigger, relay_state) # type: ignore - - return HttpResponseRedirect(frontend_url + query) - if target_user.is_active: # Try to load from the `AUTHENTICATION_BACKENDS` setting in settings.py if hasattr(settings, "AUTHENTICATION_BACKENDS") and settings.AUTHENTICATION_BACKENDS: @@ -190,6 +172,25 @@ def acs(request: HttpRequest): }, ) + use_jwt = dictor(saml2_auth_settings, "USE_JWT", False) + if use_jwt: + # Create a new JWT token for IdP-initiated login (acs) + jwt_token = create_custom_or_default_jwt(target_user) + custom_token_query_trigger = dictor(saml2_auth_settings, "TRIGGER.CUSTOM_TOKEN_QUERY") + if custom_token_query_trigger: + query = run_hook(custom_token_query_trigger, jwt_token) + else: + query = f"?token={jwt_token}" + + # Use JWT auth to send token to frontend + frontend_url = dictor(saml2_auth_settings, "FRONTEND_URL", next_url) + custom_frontend_url_trigger = dictor(saml2_auth_settings, "TRIGGER.GET_CUSTOM_FRONTEND_URL") + if custom_frontend_url_trigger: + frontend_url = run_hook(custom_frontend_url_trigger, relay_state) # type: ignore + + return HttpResponseRedirect(frontend_url + query) + + def redirect(redirect_url: Optional[str] = None) -> HttpResponseRedirect: """Redirect to the redirect_url or the root page.