Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 11 additions & 45 deletions common/djangoapps/third_party_auth/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,7 @@ def B(*args, **kwargs):
is_saml_provider,
user_exists,
)
from common.djangoapps.third_party_auth.toggles import (
is_saml_provider_site_fallback_enabled,
is_tpa_next_url_on_dispatch_enabled,
)
from common.djangoapps.third_party_auth.toggles import is_tpa_next_url_on_dispatch_enabled
from common.djangoapps.track import segment
from common.djangoapps.util.json_request import JsonResponse

Expand Down Expand Up @@ -363,10 +360,10 @@ def get_complete_url(backend_name):
ValueError: if no provider is enabled with the given backend_name.
"""
if not any(provider.Registry.get_enabled_by_backend_name(backend_name)):
# When the SAML site-fallback flag is on, the provider may not be visible to the
# site-filtered registry even though SAML auth already completed via a
# site-independent lookup. Allow get_complete_url to proceed in that case.
if not (is_saml_provider_site_fallback_enabled() and backend_name == 'tpa-saml'):
# For tpa-saml, the provider may not be visible to the site-filtered registry
# even though SAML auth already completed via a site-independent lookup.
# Allow get_complete_url to proceed in that case.
if backend_name != 'tpa-saml':
raise ValueError('Provider with backend %s not enabled' % backend_name)

return _get_url('social:complete', backend_name)
Expand Down Expand Up @@ -609,45 +606,19 @@ def should_force_account_creation():
return (current_provider and
(current_provider.skip_email_verification or current_provider.send_to_registration_first))

def is_provider_saml():
""" Verify that the third party provider uses SAML """
current_provider = provider.Registry.get_from_pipeline({'backend': current_partial.backend, 'kwargs': kwargs})
saml_providers_list = list(provider.Registry.get_enabled_by_backend_name('tpa-saml'))
return (current_provider and
current_provider.slug in [saml_provider.slug for saml_provider in saml_providers_list])

if current_partial:
strategy.session_set('partial_pipeline_token_', current_partial.token)
strategy.storage.partial.store(current_partial)

if not user:
# Use only email for user existence check in case of saml provider
_is_saml = is_provider_saml()
_provider_obj = provider.Registry.get_from_pipeline({'backend': current_partial.backend, 'kwargs': kwargs})
logger.info(
'[THIRD_PARTY_AUTH] ensure_user_information: auth_entry=%s backend=%s is_provider_saml=%s '
'current_provider=%s skip_email_verification=%s send_to_registration_first=%s '
'email=%s kwargs_response_keys=%s',
auth_entry,
current_partial.backend,
_is_saml,
_provider_obj.provider_id if _provider_obj else None,
_provider_obj.skip_email_verification if _provider_obj else None,
_provider_obj.send_to_registration_first if _provider_obj else None,
details.get('email') if details else None,
list((kwargs.get('response') or {}).keys()),
)
if _is_saml:
# Use only email for user existence check in case of saml provider.
# Check the backend name directly rather than the site-filtered registry,
# since the provider may only be visible via the site-independent fallback.
if current_partial.backend == 'tpa-saml':
user_details = {'email': details.get('email')} if details else None
else:
user_details = details
_user_exists = user_exists(user_details or {})
logger.info(
'[THIRD_PARTY_AUTH] ensure_user_information: user_exists=%s user_details_email=%s',
_user_exists,
(user_details or {}).get('email'),
)
if _user_exists:
if user_exists(user_details or {}):
# User has not already authenticated and the details sent over from
# identity provider belong to an existing user.
logger.info('[THIRD_PARTY_AUTH] ensure_user_information: dispatching to login (user exists)')
Expand All @@ -658,12 +629,7 @@ def is_provider_saml():
elif auth_entry == AUTH_ENTRY_LOGIN:
# User has authenticated with the third party provider but we don't know which edX
# account corresponds to them yet, if any.
_force = should_force_account_creation()
logger.info(
'[THIRD_PARTY_AUTH] ensure_user_information: AUTH_ENTRY_LOGIN should_force_account_creation=%s',
_force,
)
if _force:
if should_force_account_creation():
return dispatch_to_register()
logger.info('[THIRD_PARTY_AUTH] ensure_user_information: dispatching to login (no force create)')
return dispatch_to_login()
Expand Down
3 changes: 1 addition & 2 deletions common/djangoapps/third_party_auth/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
SAMLConfiguration,
SAMLProviderConfig
)
from common.djangoapps.third_party_auth.toggles import is_saml_provider_site_fallback_enabled


class Registry:
Expand Down Expand Up @@ -104,7 +103,7 @@ def get_from_pipeline(cls, running_pipeline):
# won't yield it — but the SAML handshake already completed. Look up the provider
# directly by idp_name so that pipeline steps like should_force_account_creation()
# can still read provider flags.
if is_saml_provider_site_fallback_enabled() and running_pipeline.get('backend') == 'tpa-saml':
if running_pipeline.get('backend') == 'tpa-saml':
try:
idp_name = running_pipeline['kwargs']['response']['idp_name']
return SAMLProviderConfig.current(idp_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,15 @@ def test_complete_url_raises_value_error_if_provider_not_enabled(self):
with pytest.raises(ValueError):
pipeline.get_complete_url(provider_name)

def test_complete_url_does_not_raise_for_tpa_saml_with_no_enabled_providers(self):
# tpa-saml is allowed to proceed even when no providers are visible in the
# site-filtered registry, because the SAML handshake may have completed via
# a site-independent lookup.
with mock.patch.object(provider.Registry, 'get_enabled_by_backend_name', return_value=iter([])):
url = pipeline.get_complete_url('tpa-saml')
assert url.startswith('/auth/complete')
assert 'tpa-saml' in url

def test_complete_url_returns_expected_format(self):
complete_url = pipeline.get_complete_url(self.enabled_provider.backend_name)

Expand Down Expand Up @@ -361,22 +370,19 @@ def test_redirect_for_saml_based_on_email_only(self, email, expected_redirect_ur
)
with mock.patch('common.djangoapps.third_party_auth.pipeline.provider.Registry.get_from_pipeline') as get_from_pipeline: # lint-amnesty, pylint: disable=line-too-long
get_from_pipeline.return_value = saml_provider
with mock.patch(
'common.djangoapps.third_party_auth.pipeline.provider.Registry.get_enabled_by_backend_name'
) as enabled_saml_providers:
enabled_saml_providers.return_value = [saml_provider, ] if is_saml else []
with mock.patch('social_core.pipeline.partial.partial_prepare') as partial_prepare:
partial_prepare.return_value = mock.MagicMock(token='')
strategy = mock.MagicMock()
response = pipeline.ensure_user_information(
strategy=strategy,
backend=None,
auth_entry=pipeline.AUTH_ENTRY_LOGIN,
pipeline_index=0,
details={'username': self.user.username, 'email': email}
)
assert response.status_code == 302
assert response.url == expected_redirect_url
with mock.patch('social_core.pipeline.partial.partial_prepare') as partial_prepare:
backend = 'tpa-saml' if is_saml else 'oa2-google-oauth2'
partial_prepare.return_value = mock.MagicMock(token='', backend=backend)
strategy = mock.MagicMock()
response = pipeline.ensure_user_information(
strategy=strategy,
backend=None,
auth_entry=pipeline.AUTH_ENTRY_LOGIN,
pipeline_index=0,
details={'username': self.user.username, 'email': email}
)
assert response.status_code == 302
assert response.url == expected_redirect_url


@ddt.ddt
Expand Down
30 changes: 0 additions & 30 deletions common/djangoapps/third_party_auth/toggles.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,33 +65,3 @@ def is_tpa_next_url_on_dispatch_enabled():
when dispatching to login/register pages.
"""
return TPA_NEXT_URL_ON_DISPATCH_FLAG.is_enabled()


# .. toggle_name: third_party_auth.saml_provider_site_fallback
# .. toggle_implementation: WaffleFlag
# .. toggle_default: False
# .. toggle_description: When enabled, Registry.get_from_pipeline() will fall back to a
# site-independent SAMLProviderConfig lookup when the site-filtered registry returns no
# match for a running SAML pipeline. This handles cases where the SAMLProviderConfig or
# SAMLConfiguration is associated with a different Django site than the one currently
# serving the request, while SAML auth itself already completed (SAMLAuthBackend.get_idp()
# has no site check). Without this flag, pipeline steps such as should_force_account_creation()
# cannot read provider flags (e.g. send_to_registration_first), causing new users to land on
# the login page instead of registration.
# .. toggle_use_cases: temporary
# .. toggle_creation_date: 2026-02-19
# .. toggle_target_removal_date: 2026-06-01
# .. toggle_warning: The underlying site configuration mismatch should still be fixed in Django
# admin (SAMLConfiguration and SAMLProviderConfig must reference the correct site). This flag
# is a temporary workaround until that is resolved.
SAML_PROVIDER_SITE_FALLBACK_FLAG = WaffleFlag(
f'{THIRD_PARTY_AUTH_NAMESPACE}.saml_provider_site_fallback', __name__
)


def is_saml_provider_site_fallback_enabled():
"""
Returns True if get_from_pipeline() should fall back to a site-independent
SAMLProviderConfig lookup when the site-filtered registry finds no match.
"""
return SAML_PROVIDER_SITE_FALLBACK_FLAG.is_enabled()
6 changes: 6 additions & 0 deletions lms/envs/devstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,12 @@ def should_show_debug_toolbar(request): # lint-amnesty, pylint: disable=missing
#####################################################################

# django-session-cookie middleware
# Devstack uses Lax rather than the production 'None' (which requires Secure/HTTPS).
# Note: SP-initiated SAML flows POST the SAMLResponse cross-site back to
# /auth/complete/tpa-saml/, and SameSite=Lax blocks that POST. Testing
# SP-initiated SAML locally therefore requires either running devstack over HTTPS
# (so SameSite=None;Secure can be used) or enabling the browser flag that lifts
# the SameSite-without-Secure restriction (e.g. chrome://flags/#cookies-without-same-site-must-be-secure).
DCS_SESSION_COOKIE_SAMESITE = 'Lax'
DCS_SESSION_COOKIE_SAMESITE_FORCE_ALL = True

Expand Down
19 changes: 17 additions & 2 deletions openedx/core/djangoapps/user_authn/views/login_form.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ def login_and_registration_form(request, initial_mode="login"):
saml_provider, __ = third_party_auth.utils.is_saml_provider(
running_pipeline.get('backend'), running_pipeline.get('kwargs')
)
# Also check for the partial token directly in the session. In deployments with
# DB read replicas, pipeline.get() can return None due to replication lag even
# when the pipeline IS running (token is in session but the partial object hasn't
# propagated to the replica yet). Checking the session key avoids a false None.
pipeline_token_in_session = not running_pipeline and bool(
request.session.get('partial_pipeline_token') or
request.session.get('partial_pipeline_token_')
)

# Our ?next= URL may itself contain a parameter 'tpa_hint=x' that we need to check.
# If present, we display a login page focused on third-party auth with that provider.
Expand All @@ -181,8 +189,9 @@ def login_and_registration_form(request, initial_mode="login"):
if 'tpa_hint' in next_args:
provider_id = next_args['tpa_hint'][0]
tpa_hint_provider = third_party_auth.provider.Registry.get(provider_id=provider_id)
pipeline_in_progress = running_pipeline or pipeline_token_in_session
if tpa_hint_provider:
if tpa_hint_provider.skip_hinted_login_dialog and not running_pipeline:
if tpa_hint_provider.skip_hinted_login_dialog and not pipeline_in_progress:
# Forward the user directly to the provider's login URL when the provider is configured
# to skip the dialog. Do not redirect if a TPA pipeline is already running, as that
# would cause an infinite loop (e.g. new SAML users dispatched back to /login).
Expand All @@ -194,7 +203,13 @@ def login_and_registration_form(request, initial_mode="login"):
pipeline.get_login_url(provider_id, auth_entry, redirect_url=redirect_to)
)
third_party_auth_hint = provider_id
initial_mode = "hinted_login"
# Only switch to hinted_login if no pipeline is currently running.
# If a pipeline IS running (e.g. a new SAML user was dispatched back
# here after authenticating at the IdP), keep the original mode so the
# register/login form is shown instead of re-offering the IdP button,
# which would restart the SAML flow and create a redirect loop.
if not pipeline_in_progress:
initial_mode = "hinted_login"
except (KeyError, ValueError, IndexError) as ex:
log.exception("Unknown tpa_hint provider: %s", ex)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,30 @@ def test_hinted_login_dialog_disabled_with_running_pipeline(self, url_name, auth
# The form should be rendered (200), not a redirect to the provider's auth URL.
self.assertEqual(response.status_code, 200)

@ddt.data(
('signin_user', 'login'),
('register_user', 'register'),
)
@ddt.unpack
def test_hinted_login_dialog_disabled_with_session_token_only(self, url_name, auth_entry): # pylint: disable=unused-argument
"""
Test that skip_hinted_login_dialog does NOT redirect when a partial pipeline token
exists in the session even if pipeline.get() returns None (e.g. due to DB read-replica
replication lag where the partial object hasn't propagated yet).
"""
self.google_provider.skip_hinted_login_dialog = True
self.google_provider.save()
params = [("next", "/courses/something/?tpa_hint=oa2-google-oauth2")]
pipeline_target = "openedx.core.djangoapps.user_authn.views.login_form.third_party_auth.pipeline"
# pipeline.get() returns None (replica lag), but session has the partial token
with mock.patch(f"{pipeline_target}.get", return_value=None):
session = self.client.session
session['partial_pipeline_token'] = 'some-token-value'
session.save()
response = self.client.get(reverse(url_name), params, HTTP_ACCEPT="text/html")
# The form should be rendered (200), not a redirect to the provider's auth URL.
self.assertEqual(response.status_code, 200)

@override_settings(FEATURES=dict(settings.FEATURES, THIRD_PARTY_AUTH_HINT='oa2-google-oauth2'))
@ddt.data(
'signin_user',
Expand Down
Loading