diff --git a/common/djangoapps/third_party_auth/samlproviderconfig/tests/test_samlproviderconfig.py b/common/djangoapps/third_party_auth/samlproviderconfig/tests/test_samlproviderconfig.py deleted file mode 100644 index c8e8487b659d..000000000000 --- a/common/djangoapps/third_party_auth/samlproviderconfig/tests/test_samlproviderconfig.py +++ /dev/null @@ -1,382 +0,0 @@ -""" -Tests for SAMLProviderConfig endpoints -""" -import copy -import re -from uuid import uuid4 -from django.urls import reverse -from django.contrib.sites.models import Site -from django.utils.http import urlencode -from rest_framework import status -from rest_framework.test import APITestCase - -from enterprise.models import EnterpriseCustomerIdentityProvider, EnterpriseCustomer -from enterprise.constants import ENTERPRISE_ADMIN_ROLE, ENTERPRISE_LEARNER_ROLE -from common.djangoapps.student.tests.factories import UserFactory -from common.djangoapps.third_party_auth.tests.samlutils import set_jwt_cookie -from common.djangoapps.third_party_auth.models import SAMLProviderConfig, SAMLConfiguration -from common.djangoapps.third_party_auth.tests.utils import skip_unless_thirdpartyauth -from common.djangoapps.third_party_auth.utils import convert_saml_slug_provider_id - -# country here refers to the URN provided by a user's IDP -SINGLE_PROVIDER_CONFIG = { - 'entity_id': 'id', - 'metadata_source': 'http://test.url', - 'name': 'name-of-config', - 'enabled': 'true', - 'slug': 'test-slug', - 'country': 'https://example.customer.com/countrycode', - 'attr_first_name': 'jon', - 'attr_last_name': 'snow', -} - -SINGLE_PROVIDER_CONFIG_2 = copy.copy(SINGLE_PROVIDER_CONFIG) -SINGLE_PROVIDER_CONFIG_2['name'] = 'name-of-config-2' -SINGLE_PROVIDER_CONFIG_2['slug'] = 'test-slug-2' -SINGLE_PROVIDER_CONFIG_2['display_name'] = 'display-name' -SINGLE_PROVIDER_CONFIG_2['entity_id'] = 'id-2' - -SINGLE_PROVIDER_CONFIG_3 = copy.copy(SINGLE_PROVIDER_CONFIG) -SINGLE_PROVIDER_CONFIG_3['name'] = 'name-of-config-3' -SINGLE_PROVIDER_CONFIG_3['slug'] = 'test-slug-3' -SINGLE_PROVIDER_CONFIG_3['entity_id'] = 'id-3' - - -ENTERPRISE_ID = str(uuid4()) -ENTERPRISE_ID_NON_EXISTENT = str(uuid4()) - - -@skip_unless_thirdpartyauth() -class SAMLProviderConfigTests(APITestCase): - """ - API Tests for SAMLProviderConfig REST endpoints - The skip annotation above exists because we currently cannot run this test in - the cms mode in CI builds, where the third_party_auth application is not loaded - """ - @classmethod - def setUpTestData(cls): - super().setUpTestData() - cls.user = UserFactory.create(username='testuser', password='testpwd') - cls.site, _ = Site.objects.get_or_create(domain='example.com') - cls.enterprise_customer = EnterpriseCustomer.objects.create( - uuid=ENTERPRISE_ID, - name='test-ep', - slug='test-ep', - site=cls.site) - cls.samlproviderconfig, _ = SAMLProviderConfig.objects.get_or_create( - entity_id=SINGLE_PROVIDER_CONFIG['entity_id'], - metadata_source=SINGLE_PROVIDER_CONFIG['metadata_source'], - slug=SINGLE_PROVIDER_CONFIG['slug'], - country=SINGLE_PROVIDER_CONFIG['country'], - ) - cls.samlconfiguration, _ = SAMLConfiguration.objects.get_or_create( - enabled=True, - site=cls.site, - slug='edxSideTest', - ) - - def setUp(self): # pylint: disable=super-method-not-called - set_jwt_cookie(self.client, self.user, [(ENTERPRISE_ADMIN_ROLE, ENTERPRISE_ID)]) - self.client.force_authenticate(user=self.user) - - def test_get_one_config_by_enterprise_uuid_found(self): - """ - GET auth/saml/v0/provider_config/?enterprise_customer_uuid=id=id - """ - - # for GET to work, we need an association present - EnterpriseCustomerIdentityProvider.objects.get_or_create( - provider_id=convert_saml_slug_provider_id(self.samlproviderconfig.slug), - enterprise_customer_id=ENTERPRISE_ID - ) - urlbase = reverse('saml_provider_config-list') - query_kwargs = {'enterprise_customer_uuid': ENTERPRISE_ID} - url = f'{urlbase}?{urlencode(query_kwargs)}' - - response = self.client.get(url, format='json') - - assert response.status_code == status.HTTP_200_OK - results = response.data['results'] - assert len(results) == 1 - assert results[0]['entity_id'] == SINGLE_PROVIDER_CONFIG['entity_id'] - assert results[0]['metadata_source'] == SINGLE_PROVIDER_CONFIG['metadata_source'] - assert response.data['results'][0]['country'] == SINGLE_PROVIDER_CONFIG['country'] - assert re.match(r"test-slug-\d{4}", results[0]['display_name']) - assert SAMLProviderConfig.objects.count() == 1 - - def test_get_one_config_by_enterprise_uuid_invalid_uuid(self): - """ - GET auth/saml/v0/provider_config/?enterprise_customer_uuid=invalidUUID - """ - urlbase = reverse('saml_provider_config-list') - query_kwargs = {'enterprise_customer_uuid': 'invalid_uuid'} - url = f'{urlbase}?{urlencode(query_kwargs)}' - - response = self.client.get(url, format='json') - - assert response.status_code == status.HTTP_400_BAD_REQUEST - - def test_get_one_config_by_enterprise_uuid_not_found(self): - """ - GET auth/saml/v0/provider_config/?enterprise_customer_uuid=valid-but-nonexistent-uuid - """ - - # the user must actually be authorized for this enterprise - # since we are testing auth passes but association to samlproviderconfig is not found - set_jwt_cookie(self.client, self.user, [(ENTERPRISE_ADMIN_ROLE, ENTERPRISE_ID_NON_EXISTENT)]) - self.client.force_authenticate(user=self.user) - - urlbase = reverse('saml_provider_config-list') - query_kwargs = {'enterprise_customer_uuid': ENTERPRISE_ID_NON_EXISTENT} - url = f'{urlbase}?{urlencode(query_kwargs)}' - orig_count = SAMLProviderConfig.objects.count() - - response = self.client.get(url, format='json') - - assert response.status_code == status.HTTP_404_NOT_FOUND - assert SAMLProviderConfig.objects.count() == orig_count - - def test_create_one_config(self): - """ - POST auth/saml/v0/provider_config/ -d data - """ - url = reverse('saml_provider_config-list') - data = copy.copy(SINGLE_PROVIDER_CONFIG_2) - data['enterprise_customer_uuid'] = ENTERPRISE_ID - orig_count = SAMLProviderConfig.objects.count() - - response = self.client.post(url, data) - - assert response.status_code == status.HTTP_201_CREATED - assert SAMLProviderConfig.objects.count() == (orig_count + 1) - provider_config = SAMLProviderConfig.objects.get(slug=SINGLE_PROVIDER_CONFIG_2['slug']) - assert provider_config.name == 'name-of-config-2' - assert provider_config.country == SINGLE_PROVIDER_CONFIG_2['country'] - assert provider_config.attr_username == SINGLE_PROVIDER_CONFIG['attr_first_name'] - assert provider_config.display_name == SINGLE_PROVIDER_CONFIG_2['display_name'] - - # check association has also been created - assert EnterpriseCustomerIdentityProvider.objects.filter( - provider_id=convert_saml_slug_provider_id(provider_config.slug) - ).exists(), 'Cannot find EnterpriseCustomer-->SAMLProviderConfig association' - - def test_create_one_config_fail_non_existent_enterprise_uuid(self): - """ - POST auth/saml/v0/provider_config/ -d data - """ - url = reverse('saml_provider_config-list') - data = copy.copy(SINGLE_PROVIDER_CONFIG_2) - data['enterprise_customer_uuid'] = ENTERPRISE_ID_NON_EXISTENT - orig_count = SAMLProviderConfig.objects.count() - - response = self.client.post(url, data) - - assert response.status_code == status.HTTP_403_FORBIDDEN - assert SAMLProviderConfig.objects.count() == orig_count - - # check association has NOT been created - assert not EnterpriseCustomerIdentityProvider.objects.filter( - provider_id=convert_saml_slug_provider_id(SINGLE_PROVIDER_CONFIG_2['slug']) - ).exists(), 'Did not expect to find EnterpriseCustomer-->SAMLProviderConfig association' - - def test_create_one_config_with_absent_enterprise_uuid(self): - """ - POST auth/saml/v0/provider_config/ -d data - """ - url = reverse('saml_provider_config-list') - data = copy.copy(SINGLE_PROVIDER_CONFIG_2) - orig_count = SAMLProviderConfig.objects.count() - - response = self.client.post(url, data) - - assert response.status_code == status.HTTP_400_BAD_REQUEST - assert SAMLProviderConfig.objects.count() == orig_count - - def test_create_one_config_with_no_country_urn(self): - """ - POST auth/saml/v0/provider_config/ -d data - """ - url = reverse('saml_provider_config-list') - provider_config_no_country = { - 'entity_id': 'id2', - 'metadata_source': 'http://test.url', - 'name': 'name-of-config-no-country', - 'enabled': 'true', - 'slug': 'test-slug-none', - 'enterprise_customer_uuid': ENTERPRISE_ID, - } - - response = self.client.post(url, provider_config_no_country) - assert response.status_code == status.HTTP_201_CREATED - provider_config = SAMLProviderConfig.objects.get(slug='test-slug-none') - assert provider_config.country == '' - - def test_create_one_config_with_empty_country_urn(self): - """ - POST auth/saml/v0/provider_config/ -d data - """ - url = reverse('saml_provider_config-list') - provider_config_blank_country = { - 'entity_id': 'id-empty-country-urn', - 'metadata_source': 'http://test.url', - 'name': 'name-of-config-blank-country', - 'enabled': 'true', - 'slug': 'test-slug-empty', - 'enterprise_customer_uuid': ENTERPRISE_ID, - 'country': '', - } - - response = self.client.post(url, provider_config_blank_country) - assert response.status_code == status.HTTP_201_CREATED - provider_config = SAMLProviderConfig.objects.get(slug='test-slug-empty') - assert provider_config.country == '' - - def test_unauthenticated_request_is_forbidden(self): - self.client.logout() - urlbase = reverse('saml_provider_config-list') - query_kwargs = {'enterprise_customer_uuid': ENTERPRISE_ID} - url = f'{urlbase}?{urlencode(query_kwargs)}' - set_jwt_cookie(self.client, self.user, [(ENTERPRISE_LEARNER_ROLE, ENTERPRISE_ID)]) - response = self.client.get(url, format='json') - assert response.status_code == status.HTTP_403_FORBIDDEN - - self.client.logout() - set_jwt_cookie(self.client, self.user, [(ENTERPRISE_ADMIN_ROLE, ENTERPRISE_ID_NON_EXISTENT)]) - response = self.client.get(url, format='json') - assert response.status_code == status.HTTP_403_FORBIDDEN - - def test_create_one_config_with_samlconfiguration(self): - """ - POST auth/saml/v0/provider_config/ -d data - """ - url = reverse('saml_provider_config-list') - data = copy.copy(SINGLE_PROVIDER_CONFIG_3) - data['enterprise_customer_uuid'] = ENTERPRISE_ID - data['saml_config_id'] = self.samlconfiguration.id - - response = self.client.post(url, data) - - assert response.status_code == status.HTTP_201_CREATED - provider_config = SAMLProviderConfig.objects.get(slug=SINGLE_PROVIDER_CONFIG_3['slug']) - assert provider_config.saml_configuration == self.samlconfiguration - - def test_unique_entity_id_constraint_with_different_slug(self): - """ - Test that a config cannot be created with an entity ID if another config already exists with that entity ID and - a different slug - """ - with self.assertLogs() as ctx: - url = reverse('saml_provider_config-list') - data = copy.copy(SINGLE_PROVIDER_CONFIG) - data['enterprise_customer_uuid'] = ENTERPRISE_ID - data['slug'] = 'some-other-slug' - - response = self.client.post(url, data) - - # 7/21/22 : Disabling the exception on duplicate entity ID's because of existing data. - assert ctx.records[-2].msg == f"Entity ID: {data['entity_id']} already taken" - assert response.status_code == status.HTTP_201_CREATED - # assert response.status_code == status.HTTP_400_BAD_REQUEST - # assert len(SAMLProviderConfig.objects.all()) == 1 - # assert str(response.data.get('non_field_errors')[0]) == f"Entity ID: {data['entity_id']} already taken" - - def test_unique_entity_id_constraint_with_same_slug(self): - """ - Test that a config can be created/edited using the same entity ID as an existing config as long as it shares an - entity ID. - """ - url = reverse('saml_provider_config-list') - data = copy.copy(SINGLE_PROVIDER_CONFIG) - data['enterprise_customer_uuid'] = ENTERPRISE_ID - data['name'] = 'some-other-name' - - response = self.client.post(url, data) - assert response.status_code == status.HTTP_201_CREATED - assert len(SAMLProviderConfig.objects.all()) == 2 - assert response.data.get('name') == 'some-other-name' - - def test_api_deleting_provider_configs(self): - """ - Test deleting a provider config. - """ - EnterpriseCustomerIdentityProvider.objects.get_or_create( - provider_id=convert_saml_slug_provider_id(self.samlproviderconfig.slug), - enterprise_customer_id=ENTERPRISE_ID - ) - url = reverse('saml_provider_config-list') - data = {} - data['enterprise_customer_uuid'] = ENTERPRISE_ID - - response = self.client.delete( - url + f'{str(self.samlproviderconfig.id)}/?enterprise_customer_uuid={ENTERPRISE_ID}' - ) - assert response.status_code == status.HTTP_200_OK - assert len(SAMLProviderConfig.objects.all()) == 1 - assert SAMLProviderConfig.objects.first().archived - - def test_api_deleting_config_then_using_deleted_entity_id(self): - """ - Test deleting a config then creating a new config with the entity ID of the deleted config - """ - EnterpriseCustomerIdentityProvider.objects.get_or_create( - provider_id=convert_saml_slug_provider_id(self.samlproviderconfig.slug), - enterprise_customer_id=ENTERPRISE_ID - ) - url = reverse('saml_provider_config-list') - data = {} - data['enterprise_customer_uuid'] = ENTERPRISE_ID - - response = self.client.delete( - url + f'{str(self.samlproviderconfig.id)}/?enterprise_customer_uuid={ENTERPRISE_ID}' - ) - assert response.status_code == status.HTTP_200_OK - assert len(SAMLProviderConfig.objects.all()) == 1 - assert SAMLProviderConfig.objects.first().archived - - data = copy.copy(SINGLE_PROVIDER_CONFIG) - data['enterprise_customer_uuid'] = ENTERPRISE_ID - data['entity_id'] = SINGLE_PROVIDER_CONFIG['entity_id'] - data['slug'] = 'idk-something-else' - - response = self.client.post(url, data) - assert response.status_code == status.HTTP_201_CREATED - assert len(SAMLProviderConfig.objects.all()) == 2 - - def test_using_an_edited_configs_entity_id_after_deleting(self): - """ - Test that editing an existing config then removing it still allows new configs to use the deleted config's - entity ID - """ - EnterpriseCustomerIdentityProvider.objects.get_or_create( - provider_id=convert_saml_slug_provider_id(self.samlproviderconfig.slug), - enterprise_customer_id=ENTERPRISE_ID - ) - url = reverse('saml_provider_config-list') - - data = copy.copy(SINGLE_PROVIDER_CONFIG) - data['saml_config_id'] = self.samlconfiguration.id - data['name'] = 'a new name' - - response = self.client.patch( - url + f'{str(self.samlproviderconfig.id)}/?enterprise_customer_uuid={ENTERPRISE_ID}', - data, - ) - assert response.status_code == status.HTTP_200_OK - assert len(SAMLProviderConfig.objects.all()) == 2 - - data = {} - data['enterprise_customer_uuid'] = ENTERPRISE_ID - response = self.client.delete( - url + f'{str(response.data.get("id"))}/?enterprise_customer_uuid={ENTERPRISE_ID}' - ) - - assert response.status_code == status.HTTP_200_OK - assert len(SAMLProviderConfig.objects.all()) == 2 - - data = copy.copy(SINGLE_PROVIDER_CONFIG_3) - data['enterprise_customer_uuid'] = ENTERPRISE_ID - data['saml_config_id'] = self.samlconfiguration.id - - response = self.client.post(url, data) - assert response.status_code == status.HTTP_201_CREATED - assert len(SAMLProviderConfig.objects.all()) == 3 diff --git a/common/djangoapps/third_party_auth/samlproviderconfig/urls.py b/common/djangoapps/third_party_auth/samlproviderconfig/urls.py deleted file mode 100644 index e274a572d925..000000000000 --- a/common/djangoapps/third_party_auth/samlproviderconfig/urls.py +++ /dev/null @@ -1,11 +0,0 @@ -""" - Viewset for auth/saml/v0/providerconfig/ -""" - -from rest_framework import routers - -from .views import SAMLProviderConfigViewSet - -saml_provider_config_router = routers.DefaultRouter() -saml_provider_config_router.register(r'provider_config', SAMLProviderConfigViewSet, basename="saml_provider_config") -urlpatterns = saml_provider_config_router.urls diff --git a/common/djangoapps/third_party_auth/samlproviderconfig/views.py b/common/djangoapps/third_party_auth/samlproviderconfig/views.py deleted file mode 100644 index 7286402df5bd..000000000000 --- a/common/djangoapps/third_party_auth/samlproviderconfig/views.py +++ /dev/null @@ -1,135 +0,0 @@ -""" -Viewset for auth/saml/v0/samlproviderconfig -""" - -from django.shortcuts import get_list_or_404 -from django.db.utils import IntegrityError -from edx_rbac.mixins import PermissionRequiredMixin -from rest_framework import permissions, viewsets, status -from rest_framework.response import Response -from rest_framework.exceptions import ParseError, ValidationError - -from enterprise.models import EnterpriseCustomerIdentityProvider, EnterpriseCustomer -from common.djangoapps.third_party_auth.utils import validate_uuid4_string - -from ..models import SAMLProviderConfig -from .serializers import SAMLProviderConfigSerializer -from ..utils import convert_saml_slug_provider_id - - -class SAMLProviderMixin: - permission_classes = [permissions.IsAuthenticated] - serializer_class = SAMLProviderConfigSerializer - - -class SAMLProviderConfigViewSet(PermissionRequiredMixin, SAMLProviderMixin, viewsets.ModelViewSet): - """ - A View to handle SAMLProviderConfig CRUD - - Usage: - NOTE: Only the GET request requires a request parameter, otherwise pass the uuid as part - of the post body - - GET /auth/saml/v0/provider_config/?enterprise-id=uuid - POST /auth/saml/v0/provider_config/ -d postData (must contain 'enterprise_customer_uuid') - DELETE /auth/saml/v0/provider_config/:pk -d postData (must contain 'enterprise_customer_uuid') - PATCH /auth/saml/v0/provider_config/:pk -d postData (must contain 'enterprise_customer_uuid') - - permission_required refers to the Django permission name defined - in enterprise.rules. - The associated rule will allow edx-rbac to check if the EnterpriseCustomer - returned by the get_permission_object method here, can be - accessed by the user making this request (request.user) - Access is only allowed if the user has the system role - of 'ENTERPRISE_ADMIN' which is defined in enterprise.constants - """ - permission_required = 'enterprise.can_access_admin_dashboard' - - def get_queryset(self): - """ - Find and return the matching providerconfig for the given enterprise uuid - if an association exists in EnterpriseCustomerIdentityProvider model - """ - if self.requested_enterprise_uuid is None: - raise ParseError('Required enterprise_customer_uuid is missing') - enterprise_customer_idps = get_list_or_404( - EnterpriseCustomerIdentityProvider, - enterprise_customer__uuid=self.requested_enterprise_uuid - ) - slug_list = [idp.provider_id for idp in enterprise_customer_idps] - saml_config_ids = [ - config.id for config in SAMLProviderConfig.objects.current_set() if config.provider_id in slug_list - ] - return SAMLProviderConfig.objects.filter(id__in=saml_config_ids) - - def destroy(self, request, *args, **kwargs): - saml_provider_config = self.get_object() - config_id = saml_provider_config.id - provider_config_provider_id = saml_provider_config.provider_id - customer_uuid = self.requested_enterprise_uuid - try: - enterprise_customer = EnterpriseCustomer.objects.get(pk=customer_uuid) - except EnterpriseCustomer.DoesNotExist: - raise ValidationError(f'Enterprise customer not found at uuid: {customer_uuid}') # lint-amnesty, pylint: disable=raise-missing-from - - enterprise_saml_provider = EnterpriseCustomerIdentityProvider.objects.filter( - enterprise_customer=enterprise_customer, - provider_id=provider_config_provider_id, - ) - enterprise_saml_provider.delete() - SAMLProviderConfig.objects.filter(id=saml_provider_config.id).update(archived=True, enabled=False) - return Response(data=config_id, status=status.HTTP_200_OK) - - @property - def requested_enterprise_uuid(self): - """ - The enterprise customer uuid from request params or post body - """ - if self.request.method in ('POST', 'PUT'): - uuid_str = self.request.POST.get('enterprise_customer_uuid') - if uuid_str is None: - raise ParseError('Required enterprise_customer_uuid is missing') - return uuid_str - else: - uuid_str = self.request.query_params.get('enterprise_customer_uuid') - if validate_uuid4_string(uuid_str) is False: - raise ParseError('Invalid UUID enterprise_customer_id') - return uuid_str - - def get_permission_object(self): - """ - Retrieve an EnterpriseCustomer uuid to do auth against - Right now this is the same as from the request object - meaning that only users belonging to the same enterprise - can access these endpoints, we have to sort out the operator role use case - """ - return self.requested_enterprise_uuid - - def create(self, request, *args, **kwargs): - """ - Process POST /auth/saml/v0/provider_config/ {postData} - """ - - customer_uuid = self.requested_enterprise_uuid - try: - enterprise_customer = EnterpriseCustomer.objects.get(pk=customer_uuid) - except EnterpriseCustomer.DoesNotExist: - raise ValidationError(f'Enterprise customer not found at uuid: {customer_uuid}') # lint-amnesty, pylint: disable=raise-missing-from - - # Create the samlproviderconfig model first - try: - serializer = self.get_serializer(data=request.data) - serializer.is_valid(raise_exception=True) - self.perform_create(serializer) - except IntegrityError as exc: - return Response(str(exc), status=status.HTTP_400_BAD_REQUEST) - - # Associate the enterprise customer with the provider - association_obj = EnterpriseCustomerIdentityProvider( - enterprise_customer=enterprise_customer, - provider_id=convert_saml_slug_provider_id(serializer.data['slug']) - ) - association_obj.save() - - headers = self.get_success_headers(serializer.data) - return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers) diff --git a/common/djangoapps/third_party_auth/samlproviderdata/tests/test_samlproviderdata.py b/common/djangoapps/third_party_auth/samlproviderdata/tests/test_samlproviderdata.py deleted file mode 100644 index 7cc4003bc496..000000000000 --- a/common/djangoapps/third_party_auth/samlproviderdata/tests/test_samlproviderdata.py +++ /dev/null @@ -1,229 +0,0 @@ -# pylint: disable=missing-module-docstring -import copy -from datetime import datetime # lint-amnesty, pylint: disable=wrong-import-order -from unittest import mock -from uuid import uuid4 # lint-amnesty, pylint: disable=wrong-import-order -from zoneinfo import ZoneInfo - -from django.contrib.sites.models import Site -from django.urls import reverse -from django.utils.http import urlencode -from enterprise.constants import ENTERPRISE_ADMIN_ROLE, ENTERPRISE_LEARNER_ROLE -from enterprise.models import EnterpriseCustomer, EnterpriseCustomerIdentityProvider -from rest_framework import status -from rest_framework.test import APITestCase - -from common.djangoapps.student.tests.factories import UserFactory -from common.djangoapps.third_party_auth.models import SAMLProviderConfig, SAMLProviderData -from common.djangoapps.third_party_auth.tests.samlutils import set_jwt_cookie -from common.djangoapps.third_party_auth.tests.utils import skip_unless_thirdpartyauth -from common.djangoapps.third_party_auth.utils import convert_saml_slug_provider_id - -SINGLE_PROVIDER_CONFIG = { - 'entity_id': 'http://entity-id-1', - 'metadata_source': 'http://test.url', - 'name': 'name-of-config', - 'enabled': 'true', - 'slug': 'test-slug' -} - -# entity_id here matches that of the providerconfig, intentionally -# that allows this data entity to be found -SINGLE_PROVIDER_DATA = { - 'entity_id': 'http://entity-id-1', - 'sso_url': 'http://test.url', - 'public_key': 'a-key0Aid98', - 'fetched_at': datetime.now(ZoneInfo("UTC")).replace(microsecond=0) -} - -SINGLE_PROVIDER_DATA_2 = copy.copy(SINGLE_PROVIDER_DATA) -SINGLE_PROVIDER_DATA_2['entity_id'] = 'http://entity-id-2' -SINGLE_PROVIDER_DATA_2['sso_url'] = 'http://test2.url' - -ENTERPRISE_ID = str(uuid4()) -BAD_ENTERPRISE_ID = str(uuid4()) - - -@skip_unless_thirdpartyauth() -class SAMLProviderDataTests(APITestCase): - """ - API Tests for SAMLProviderConfig REST endpoints - """ - @classmethod - def setUpTestData(cls): - super().setUpTestData() - cls.user = UserFactory.create(username='testuser', password='testpwd') - cls.site, _ = Site.objects.get_or_create(domain='example.com') - cls.enterprise_customer = EnterpriseCustomer.objects.create( - uuid=ENTERPRISE_ID, - name='test-ep', - slug='test-ep', - site=cls.site) - cls.saml_provider_config, _ = SAMLProviderConfig.objects.get_or_create( - entity_id=SINGLE_PROVIDER_CONFIG['entity_id'], - metadata_source=SINGLE_PROVIDER_CONFIG['metadata_source'] - ) - # the entity_id here must match that of the saml_provider_config - cls.saml_provider_data, _ = SAMLProviderData.objects.get_or_create( - entity_id=SINGLE_PROVIDER_DATA['entity_id'], - sso_url=SINGLE_PROVIDER_DATA['sso_url'], - fetched_at=SINGLE_PROVIDER_DATA['fetched_at'] - ) - cls.enterprise_customer_idp, _ = EnterpriseCustomerIdentityProvider.objects.get_or_create( - provider_id=convert_saml_slug_provider_id(cls.saml_provider_config.slug), - enterprise_customer_id=ENTERPRISE_ID - ) - - def setUp(self): # pylint: disable=super-method-not-called - # a cookie with roles: [{enterprise_admin_role: ent_id}] will be - # needed to rbac to authorize access for this view - set_jwt_cookie(self.client, self.user, [(ENTERPRISE_ADMIN_ROLE, ENTERPRISE_ID)]) - self.client.force_authenticate(user=self.user) - - def test_get_one_provider_data_success(self): - # GET auth/saml/v0/providerdata/?enterprise_customer_uuid=id - url_base = reverse('saml_provider_data-list') - query_kwargs = {'enterprise_customer_uuid': ENTERPRISE_ID} - url = f'{url_base}?{urlencode(query_kwargs)}' - - response = self.client.get(url, format='json') - - assert response.status_code == status.HTTP_200_OK - results = response.data['results'] - assert len(results) == 1 - assert results[0]['sso_url'] == SINGLE_PROVIDER_DATA['sso_url'] - - def test_get_one_provider_data_with_pk_success(self): - # GET auth/saml/v0/providerdata//?enterprise_customer_uuid=id - url_base = reverse('saml_provider_data-list') - query_kwargs = {'enterprise_customer_uuid': ENTERPRISE_ID} - url = f'{url_base}{self.saml_provider_data.id}/?{urlencode(query_kwargs)}' - - response = self.client.get(url, format='json') - assert response.status_code == status.HTTP_200_OK - assert response.data.get('id') == self.saml_provider_data.id - assert response.data.get('entity_id') == self.saml_provider_data.entity_id - assert response.data.get('sso_url') == self.saml_provider_data.sso_url - assert response.data.get('public_key') == self.saml_provider_data.public_key - - def test_create_one_provider_data_success(self): - # POST auth/saml/v0/providerdata/ -d data - url = reverse('saml_provider_data-list') - data = copy.copy(SINGLE_PROVIDER_DATA_2) - data['enterprise_customer_uuid'] = ENTERPRISE_ID - orig_count = SAMLProviderData.objects.count() - - response = self.client.post(url, data) - - assert response.status_code == status.HTTP_201_CREATED - assert SAMLProviderData.objects.count() == (orig_count + 1) - assert SAMLProviderData.objects.get( - entity_id=SINGLE_PROVIDER_DATA_2['entity_id'] - ).sso_url == SINGLE_PROVIDER_DATA_2['sso_url'] - - def test_create_one_data_with_absent_enterprise_uuid(self): - """ - POST auth/saml/v0/provider_data/ -d data - """ - url = reverse('saml_provider_data-list') - data = copy.copy(SINGLE_PROVIDER_DATA_2) - orig_count = SAMLProviderData.objects.count() - - response = self.client.post(url, data) - - assert response.status_code == status.HTTP_400_BAD_REQUEST - assert SAMLProviderData.objects.count() == orig_count - - def test_patch_one_provider_data(self): - # PATCH auth/saml/v0/providerdata/ -d data - url = reverse('saml_provider_data-detail', kwargs={'pk': self.saml_provider_data.id}) - data = { - 'sso_url': 'http://new.url' - } - data['enterprise_customer_uuid'] = ENTERPRISE_ID - orig_count = SAMLProviderData.objects.count() - - response = self.client.patch(url, data) - - assert response.status_code == status.HTTP_200_OK - assert SAMLProviderData.objects.count() == orig_count - - # ensure only the sso_url was updated - fetched_provider_data = SAMLProviderData.objects.get(pk=self.saml_provider_data.id) - assert fetched_provider_data.sso_url == 'http://new.url' - assert fetched_provider_data.fetched_at == SINGLE_PROVIDER_DATA['fetched_at'] - assert fetched_provider_data.entity_id == SINGLE_PROVIDER_DATA['entity_id'] - - def test_delete_one_provider_data(self): - # DELETE auth/saml/v0/providerdata/ -d data - url_base = reverse('saml_provider_data-detail', kwargs={'pk': self.saml_provider_data.id}) - query_kwargs = {'enterprise_customer_uuid': ENTERPRISE_ID} - url = f'{url_base}?{urlencode(query_kwargs)}' - orig_count = SAMLProviderData.objects.count() - - response = self.client.delete(url) - - assert response.status_code == status.HTTP_204_NO_CONTENT - assert SAMLProviderData.objects.count() == (orig_count - 1) - - # ensure only the sso_url was updated - query_set_count = SAMLProviderData.objects.filter(pk=self.saml_provider_data.id).count() - assert query_set_count == 0 - - def test_get_one_provider_data_failure(self): - set_jwt_cookie(self.client, self.user, [(ENTERPRISE_ADMIN_ROLE, BAD_ENTERPRISE_ID)]) - self.client.force_authenticate(user=self.user) - url_base = reverse('saml_provider_data-list') - query_kwargs = {'enterprise_customer_uuid': BAD_ENTERPRISE_ID} - url = f'{url_base}?{urlencode(query_kwargs)}' - - response = self.client.get(url, format='json') - - assert response.status_code == status.HTTP_404_NOT_FOUND - - def test_unauthenticated_request_is_forbidden(self): - self.client.logout() - urlbase = reverse('saml_provider_data-list') - query_kwargs = {'enterprise_customer_uuid': ENTERPRISE_ID} - url = f'{urlbase}?{urlencode(query_kwargs)}' - set_jwt_cookie(self.client, self.user, [(ENTERPRISE_LEARNER_ROLE, ENTERPRISE_ID)]) - response = self.client.get(url, format='json') - assert response.status_code == status.HTTP_403_FORBIDDEN - - # manually running second case as DDT is having issues. - self.client.logout() - set_jwt_cookie(self.client, self.user, [(ENTERPRISE_ADMIN_ROLE, BAD_ENTERPRISE_ID)]) - response = self.client.get(url, format='json') - assert response.status_code == status.HTTP_403_FORBIDDEN - - @mock.patch('common.djangoapps.third_party_auth.samlproviderdata.views.fetch_metadata_xml') - @mock.patch('common.djangoapps.third_party_auth.samlproviderdata.views.parse_metadata_xml') - def test_sync_one_provider_data_success(self, mock_parse, mock_fetch): - """ - POST auth/saml/v0/provider_data/sync_provider_data -d data - """ - mock_fetch.return_value = 'tag' - public_key = ['askdjf;sakdjfs;adkfjas;dkfjas;dkfjas;dlkfj'] - sso_url = 'https://fake-test.id' - expires_at = datetime.now() - mock_parse.return_value = (public_key, sso_url, expires_at) - url = reverse('saml_provider_data-sync-provider-data') - data = { - 'entity_id': 'http://entity-id-1', - 'metadata_url': 'http://a-url', - 'enterprise_customer_uuid': ENTERPRISE_ID, - } - SAMLProviderData.objects.all().delete() - orig_count = SAMLProviderData.objects.count() - - response = self.client.post(url, data) - - assert response.status_code == status.HTTP_201_CREATED - assert response.data == " Created new record(s) for SAMLProviderData for entityID http://entity-id-1" - assert SAMLProviderData.objects.count() == orig_count + 1 - - # should only update this time - response = self.client.post(url, data) - assert response.status_code == status.HTTP_200_OK - assert response.data == (" Updated existing SAMLProviderData record(s) for entityID http://entity-id-1") - assert SAMLProviderData.objects.count() == orig_count + 1 diff --git a/common/djangoapps/third_party_auth/samlproviderdata/urls.py b/common/djangoapps/third_party_auth/samlproviderdata/urls.py deleted file mode 100644 index 18b908dc6f39..000000000000 --- a/common/djangoapps/third_party_auth/samlproviderdata/urls.py +++ /dev/null @@ -1,11 +0,0 @@ -""" - url mappings for auth/saml/v0/providerdata/ -""" - -from rest_framework import routers - -from .views import SAMLProviderDataViewSet - -saml_provider_data_router = routers.DefaultRouter() -saml_provider_data_router.register(r'provider_data', SAMLProviderDataViewSet, basename="saml_provider_data") -urlpatterns = saml_provider_data_router.urls diff --git a/common/djangoapps/third_party_auth/samlproviderdata/views.py b/common/djangoapps/third_party_auth/samlproviderdata/views.py deleted file mode 100644 index b5d044bd0498..000000000000 --- a/common/djangoapps/third_party_auth/samlproviderdata/views.py +++ /dev/null @@ -1,146 +0,0 @@ -""" - Viewset for auth/saml/v0/samlproviderdata -""" -from datetime import datetime -import logging -from requests.exceptions import SSLError, MissingSchema, HTTPError - -from django.http import Http404 -from django.shortcuts import get_object_or_404 -from edx_rbac.mixins import PermissionRequiredMixin -from enterprise.models import EnterpriseCustomerIdentityProvider -from rest_framework import permissions, status, viewsets -from rest_framework.decorators import action -from rest_framework.exceptions import ParseError -from rest_framework.response import Response - -from common.djangoapps.third_party_auth.utils import ( - convert_saml_slug_provider_id, - create_or_update_bulk_saml_provider_data, - fetch_metadata_xml, - parse_metadata_xml, - validate_uuid4_string -) - -from ..models import SAMLProviderConfig, SAMLProviderData -from .serializers import SAMLProviderDataSerializer - -log = logging.getLogger(__name__) - - -class SAMLProviderDataMixin: - permission_classes = [permissions.IsAuthenticated] - serializer_class = SAMLProviderDataSerializer - - -class SAMLProviderDataViewSet(PermissionRequiredMixin, SAMLProviderDataMixin, viewsets.ModelViewSet): - """ - A View to handle SAMLProviderData CRUD. - Uses the edx-rbac mixin PermissionRequiredMixin to apply enterprise authorization - - Usage: - NOTE: Only the GET request requires a request parameter, otherwise pass the uuid as part - of the post body - - GET /auth/saml/v0/provider_data/?enterprise-id=uuid - POST /auth/saml/v0/provider_data/ -d postData (must contain 'enterprise_customer_uuid') - DELETE /auth/saml/v0/provider_data/:pk -d postData (must contain 'enterprise_customer_uuid') - PATCH /auth/saml/v0/provider_data/:pk -d postData (must contain 'enterprise_customer_uuid') - POST /auth/saml/v0/provider_data/sync_provider_data (fetches metadata info from metadata url provided) - - """ - permission_required = 'enterprise.can_access_admin_dashboard' - - def get_queryset(self): - """ - Find and return the matching providerid for the given enterprise uuid - Note: There is no direct association between samlproviderdata and enterprisecustomer. - So we make that association in code via samlproviderdata > samlproviderconfig ( via entity_id ) - then, we fetch enterprisecustomer via samlproviderconfig > enterprisecustomer ( via association table ) - """ - if self.requested_enterprise_uuid is None: - raise ParseError('Required enterprise_customer_uuid is missing') - enterprise_customer_idp = get_object_or_404( - EnterpriseCustomerIdentityProvider, - enterprise_customer__uuid=self.requested_enterprise_uuid - ) - try: - saml_provider = SAMLProviderConfig.objects.current_set().get( - slug=convert_saml_slug_provider_id(enterprise_customer_idp.provider_id)) - except SAMLProviderConfig.DoesNotExist: - raise Http404('No matching SAML provider found.') # lint-amnesty, pylint: disable=raise-missing-from - provider_data_id = self.request.parser_context.get('kwargs').get('pk') - if provider_data_id: - return SAMLProviderData.objects.filter(id=provider_data_id) - return SAMLProviderData.objects.filter(entity_id=saml_provider.entity_id) - - @property - def requested_enterprise_uuid(self): - """ - The enterprise customer uuid from request params or post body - """ - if self.request.method in ('POST', 'PATCH'): - uuid_str = self.request.POST.get('enterprise_customer_uuid') - if uuid_str is None: - raise ParseError('Required enterprise_customer_uuid is missing') - return uuid_str - else: - uuid_str = self.request.query_params.get('enterprise_customer_uuid') - if validate_uuid4_string(uuid_str) is False: - raise ParseError('Invalid UUID enterprise_customer_id') - return uuid_str - - def get_permission_object(self): - """ - Retrieve an EnterpriseCustomer to do auth against - """ - return self.requested_enterprise_uuid - - @action(detail=False, methods=['post', 'put']) - def sync_provider_data(self, request): - """ - Creates or updates a SAMProviderData record using info fetched from remote SAML metadata - For now we will require entityID but in future we will enhance this to try and extract entityID - from the metadata file, and make entityId optional, and return error response if there are - multiple entityIDs listed so that the user can choose and retry with a specified entityID - """ - entity_id = request.POST.get('entity_id') - metadata_url = request.POST.get('metadata_url') - sso_url = request.POST.get('sso_url') - public_keys = request.POST.get('public_key') - if not entity_id: - return Response('entity_id is required', status.HTTP_400_BAD_REQUEST) - if not metadata_url and not (sso_url and public_keys): - return Response('either metadata_url or sso and public key are required', status.HTTP_400_BAD_REQUEST) - if metadata_url and (sso_url or public_keys): - return Response( - 'either metadata_url or sso and public key can be provided, not both', status.HTTP_400_BAD_REQUEST - ) - - if metadata_url: - # part 1: fetch information from remote metadata based on metadataUrl in samlproviderconfig - try: - xml = fetch_metadata_xml(metadata_url) - except (SSLError, MissingSchema, HTTPError) as ex: - msg = f'Could not verify provider metadata url. Exc type: {type(ex).__name__}' - log.warning(msg) - return Response(msg, status.HTTP_406_NOT_ACCEPTABLE) - - # part 2: create/update samlproviderdata - log.info("Processing IdP with entityID %s", entity_id) - public_keys, sso_url, expires_at = parse_metadata_xml(xml, entity_id) - else: - now = datetime.now() - expires_at = now.replace(year=now.year + 10) - changed = create_or_update_bulk_saml_provider_data(entity_id, public_keys, sso_url, expires_at) - if changed: - str_message = f" Created new record(s) for SAMLProviderData for entityID {entity_id}" - log.info(str_message) - response = str_message - http_status = status.HTTP_201_CREATED - else: - str_message = f" Updated existing SAMLProviderData record(s) for entityID {entity_id}" - log.info(str_message) - response = str_message - http_status = status.HTTP_200_OK - return Response(response, status=http_status) diff --git a/common/djangoapps/third_party_auth/urls.py b/common/djangoapps/third_party_auth/urls.py index a5908da26fd0..843ff15d2bab 100644 --- a/common/djangoapps/third_party_auth/urls.py +++ b/common/djangoapps/third_party_auth/urls.py @@ -1,7 +1,6 @@ """Url configuration for the auth module.""" -from django.urls import include -from django.urls import path, re_path +from django.urls import include, path, re_path from .views import ( IdPRedirectView, @@ -26,7 +25,5 @@ name='custom_disconnect_json_individual' ), path('auth/', include('social_django.urls', namespace='social')), - path('auth/saml/v0/', include('common.djangoapps.third_party_auth.samlproviderconfig.urls')), - path('auth/saml/v0/', include('common.djangoapps.third_party_auth.samlproviderdata.urls')), path('auth/saml/v0/', include('common.djangoapps.third_party_auth.saml_configuration.urls')), ]