Skip to content

Commit

Permalink
fix #OPPIA-983
Browse files Browse the repository at this point in the history
  • Loading branch information
alexlittle committed Mar 17, 2023
1 parent 5cf7d87 commit 8ef7f6e
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 60 deletions.
81 changes: 31 additions & 50 deletions profile/mixins/ExportAsCSVMixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ def load_csv_fields(cls, instance):
for field_name in cls.available_fields:
label = instance.get_field_label(field_name)
if label:
cls.__csv_fields[field_name] = {'label': str(label),
'type': 'base'}
cls.__csv_fields[field_name] = {'label': str(label), 'type': 'base'}

custom_fields = CustomField.objects.all().order_by('order')
for field in custom_fields:
cls.__csv_fields[field.id] = {'label': str(field.label),
'type': field.type}
cls.__csv_fields[field.id] = {'label': str(field.label), 'type': field.type}

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand All @@ -39,53 +37,40 @@ def __init__(self, *args, **kwargs):
# If None, the field does not exist
def get_field_label(self, field_name):
if hasattr(self.model, field_name):
try:
label = self.model._meta.get_field(
field_name).verbose_name.strip()
if field_name in self.field_labels:
return self.field_labels[field_name]
else:
return label
except FieldDoesNotExist:
# If it is not a field, we try to find a property with that
# name
if field_name in dir(self.model) or \
isinstance(getattr(self.model, field_name), property):
if field_name in self.field_labels:
return self.field_labels[field_name]

return self.get_field_label_from_attributes(field_name)
elif field_name in self.field_labels:
return self.field_labels[field_name]

else:
# We try to find a foreignKey model field
field_model = field_name.split('__')
if len(field_model) == 2 and field_model[0] in dir(self.model):
model = self.model._meta.get_field(
field_model[0]).remote_field.model
return model._meta.get_field(
field_model[1]).verbose_name.strip()

model = self.model._meta.get_field(field_model[0]).remote_field.model
return model._meta.get_field(field_model[1]).verbose_name.strip()
return None

def export_csv(self,
request,
object_list,
filter_list=None,
*args,
**kwargs):
def get_field_label_from_attributes(self, field_name):
try:
label = self.model._meta.get_field(field_name).verbose_name.strip()
if field_name in self.field_labels:
return self.field_labels[field_name]
else:
return label
except FieldDoesNotExist:
# If it is not a field, we try to find a property with that name
if field_name in dir(self.model) or isinstance(getattr(self.model, field_name), property):
if field_name in self.field_labels:
return self.field_labels[field_name]
return None

def export_csv(self, request, object_list, filter_list=None, *args, **kwargs):

now = datetime.datetime.now()
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = \
'attachment; filename="{}_{}.csv'.format(self.csv_filename,
now.strftime('%Y%m%d'))
'attachment; filename="{}_{}.csv'.format(self.csv_filename, now.strftime('%Y%m%d'))

response.write(u'\ufeff'.encode('utf-8'))
writer = csv.writer(response,
dialect='excel',
delimiter=str(','),
quotechar=str('"'))
writer = csv.writer(response, dialect='excel', delimiter=str(','), quotechar=str('"'))

# If no field was selected, we export all of them
if filter_list is None or len(filter_list) == 0:
Expand All @@ -100,23 +85,24 @@ def export_csv(self,
final_fields.append(field)

writer.writerow(header_row)
writer = self.export_csv_write_rows(writer, object_list, final_fields)

return response

def export_csv_write_rows(self, writer, object_list, final_fields):
for elem in object_list:
results = []
for field in final_fields:
if self.__csv_fields[field]['type'] == 'base':
value = self.get_field_value(elem, field)
else:
customfield = UserProfileCustomField.objects.filter(
key_name=field, user=elem).first()
customfield = UserProfileCustomField.objects.filter(key_name=field, user=elem).first()
value = customfield.get_value() if customfield else None
results.append(value)
writer.writerow(results)
return writer

return response

# Method to access a value by field name, traversing the foreign key
# objects
# Method to access a value by field name, traversing the foreign key objects
def get_field_value(self, instance, field):
field_path = field.split('__')
attr = instance
Expand Down Expand Up @@ -147,8 +133,7 @@ def get_context_data(self, **kwargs):
context['export_csv_fields'] = self.__csv_fields

if self.export_filter_form:
context['export_filter_form'] = self.export_filter_form(
self.request.GET)
context['export_filter_form'] = self.export_filter_form(self.request.GET)

return context

Expand All @@ -163,10 +148,6 @@ def get(self, request, *args, **kwargs):
filter_list = request.GET.getlist('csv_fields[]', None)
object_list = self.get_list_to_export()

return self.export_csv(request,
object_list,
filter_list,
*args,
**kwargs)
return self.export_csv(request, object_list, filter_list, *args, **kwargs)

return super().get(request, *args, **kwargs)
15 changes: 5 additions & 10 deletions tests/profile/views/test_download_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ class DownloadDataViewsTest(OppiaTestCase):

def test_invalid_download(self):
self.client.force_login(self.normal_user)
response = self.client.get(reverse(self.STR_URL,
args=['invalid']))
response = self.client.get(reverse(self.STR_URL, args=['invalid']))
self.assertEqual(404, response.status_code)

def test_profile(self):
Expand All @@ -28,25 +27,21 @@ def test_profile(self):
self.teacher_user,
self.staff_user]:
self.client.force_login(user)
response = self.client.get(reverse(self.STR_URL,
args=['profile']))
response = self.client.get(reverse(self.STR_URL, args=['profile']))
self.assertEqual(200, response.status_code)
self.assertTemplateUsed(response, 'profile/export/profile.html')
self.assertEqual(response['content-type'],
self.STR_EXPECTED_CONTENT_TYPE)
self.assertEqual(response['content-type'], self.STR_EXPECTED_CONTENT_TYPE)

def test_points(self):
for user in [self.normal_user,
self.admin_user,
self.teacher_user,
self.staff_user]:
self.client.force_login(user)
response = self.client.get(reverse(self.STR_URL,
args=['points']))
response = self.client.get(reverse(self.STR_URL, args=['points']))
self.assertEqual(200, response.status_code)
self.assertTemplateUsed(response, 'profile/export/points.html')
self.assertEqual(response['content-type'],
self.STR_EXPECTED_CONTENT_TYPE)
self.assertEqual(response['content-type'], self.STR_EXPECTED_CONTENT_TYPE)

def test_badges(self):
for user in [self.normal_user,
Expand Down
74 changes: 74 additions & 0 deletions tests/profile/views/test_export_users.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import csv
import io

from django.urls import reverse
from oppia.test import OppiaTestCase


class ExportUsersViewsTest(OppiaTestCase):

fixtures = ['tests/test_user.json',
'tests/test_oppia.json',
'tests/test_permissions.json',
'tests/test_course_permissions.json',
'tests/test_customfields.json']

STR_EXPECTED_CONTENT_TYPE = 'text/csv'
STR_URL = 'profile:users_list'

def test_permissions(self):
for user in [self.normal_user,
self.teacher_user]:
self.client.force_login(user)
response = self.client.get(reverse(self.STR_URL) + "?export=csv&username=&first_name=&last_name=&email=" +
"&is_active=&is_staff=&start_date=&end_date=&userprofilecustomfield_test=")
self.assertEqual(403, response.status_code)

def test_download_all_users_all_data(self):
for user in [self.admin_user,
self.staff_user]:
self.client.force_login(user)
response = self.client.get(reverse(self.STR_URL) + "?export=csv&username=&first_name=&last_name=" +
"&email=&is_active=&is_staff=&start_date=&end_date=" +
"&userprofilecustomfield_test=")
self.assertEqual(200, response.status_code)
self.assertEqual(response['content-type'], self.STR_EXPECTED_CONTENT_TYPE)
csv_file = csv.DictReader(io.StringIO(response.content.decode()))
self.assertEqual(6, len(list(csv_file)))

def test_download_filtered_by_registration_date_all_data(self):
for user in [self.admin_user,
self.staff_user]:
self.client.force_login(user)
response = self.client.get(reverse(self.STR_URL) + "?export=csv&username=&first_name=&last_name=&email=" +
"&is_active=&is_staff=&start_date=2022-01-01&end_date=2022-12-31" +
"&userprofilecustomfield_test=")
self.assertEqual(200, response.status_code)
self.assertEqual(response['content-type'], self.STR_EXPECTED_CONTENT_TYPE)
csv_file = csv.DictReader(io.StringIO(response.content.decode()))
self.assertEqual(0, len(list(csv_file)))

def test_download_all_users_username_only(self):
for user in [self.admin_user,
self.staff_user]:
self.client.force_login(user)
response = self.client.get(reverse(self.STR_URL) + "?export=csv&username=&first_name=&last_name=&email=" +
"&is_active=&is_staff=&start_date=&end_date=&userprofilecustomfield_test=" +
"&csv_fields%5B%5D=username")
self.assertEqual(200, response.status_code)
self.assertEqual(response['content-type'], self.STR_EXPECTED_CONTENT_TYPE)
csv_file = csv.DictReader(io.StringIO(response.content.decode()))
self.assertEqual(6, len(list(csv_file)))

def test_download_all_users_invalid_field(self):
for user in [self.admin_user,
self.staff_user]:
self.client.force_login(user)
response = self.client.get(reverse(self.STR_URL) + "?export=csv&username=&first_name=&last_name=&email=" +
"&notafield=&is_staff=&start_date=&end_date=&userprofilecustomfield_test=" +
"&userprofilecustomfield_notafield=&csv_fields%5B%5D=invalidnotafield")
self.assertEqual(200, response.status_code)
self.assertEqual(response['content-type'], self.STR_EXPECTED_CONTENT_TYPE)
csv_file = csv.DictReader(io.StringIO(response.content.decode()))
self.assertEqual(0, len(list(csv_file)))

0 comments on commit 8ef7f6e

Please sign in to comment.