Skip to content
175 changes: 168 additions & 7 deletions tubular/scripts/delete_expired_partner_gdpr_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
"""


from collections import OrderedDict
from datetime import datetime, timedelta
from dateutil.parser import parse
from functools import partial
from os import path
import io
import json
import logging
import sys
import unicodedata

import click
from six import text_type
import yaml
from pytz import UTC

Expand All @@ -30,13 +34,20 @@

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)

# Deletion notification template for files about to be deleted
# Format variables: tags, filename
DELETION_NOTIFICATION_MESSAGE_TEMPLATE = """
Hello from edX. Dear {tags}, this is an automated notice that the retirement report file "{filename}" in your Google Drive folder is being deleted as part of our data retention policy.
""".strip()

# Return codes for various fail cases
ERR_NO_CONFIG = -1
ERR_BAD_CONFIG = -2
ERR_NO_SECRETS = -3
ERR_BAD_SECRETS = -4
ERR_DELETING_REPORTS = -5
ERR_BAD_AGE = -6
ERR_DRIVE_LISTING = -7


def _config_or_exit(config_file, google_secrets_file):
Expand Down Expand Up @@ -66,6 +77,154 @@ def _config_or_exit(config_file, google_secrets_file):
FAIL_EXCEPTION(ERR_BAD_SECRETS, 'Failed to read secrets file {}'.format(google_secrets_file), exc)


def _config_drive_folder_map_or_exit(config, as_user_account=False):
"""
Lists folders under our top level parent for this environment and populates
config['partner_folder_mapping'] with {partner name: folder id}. This ensures
notifications target the same folder structure that deletion uses.

Args:
config (dict): Configuration dictionary to mutate
as_user_account (bool): Whether using OAuth2 user account authentication
"""
drive = DriveApi(config['google_secrets_file'], as_user_account=as_user_account)

try:
LOG('Attempting to find all partner sub-directories on Drive.')
folders = drive.walk_files(
config['drive_partners_folder'],
mimetype='application/vnd.google-apps.folder',
recurse=False
)
except Exception as exc: # pylint: disable=broad-except
FAIL_EXCEPTION(ERR_DRIVE_LISTING, 'Finding partner directories on Drive failed.', exc)

if not folders:
FAIL(ERR_DRIVE_LISTING, 'Finding partner directories on Drive failed. Check your permissions.')

config['partner_folder_mapping'] = OrderedDict()
for folder in folders:
folder['name'] = unicodedata.normalize('NFKC', text_type(folder['name']))
config['partner_folder_mapping'][folder['name']] = folder['id']

Comment thread
ttak-apphelix marked this conversation as resolved.
LOG('Found {} partner folder(s): {}'.format(
len(config['partner_folder_mapping']),
', '.join(config['partner_folder_mapping'].keys())
))


def _get_external_emails_for_partners(drive, config):
"""
Extract external email addresses from partner folder permissions.

Args:
drive (DriveApi): Initialized Drive API client.
config (dict): Configuration dictionary containing partner_folder_mapping and denied_notification_domains.

Returns:
dict: Mapping of partner names to lists of external email addresses (denied domains filtered out).
"""
partners = list(config['partner_folder_mapping'].keys())

folder_ids = {config['partner_folder_mapping'][partner] for partner in partners}

partner_folders_to_permissions = drive.list_permissions_for_files(
folder_ids,
fields='emailAddress',
)

permissions = {
partner: partner_folders_to_permissions.get(config['partner_folder_mapping'][partner]) or []
for partner in partners
}

denied_domains = config.get('denied_notification_domains', [])
external_emails = {
partner: [
perm['emailAddress']
for perm in permissions[partner]
if not any(
perm['emailAddress'].lower().endswith(denied_domain.lower())
for denied_domain in denied_domains
)
Comment thread
ttak-apphelix marked this conversation as resolved.
]
for partner in permissions
}

return external_emails
Comment thread
ttak-apphelix marked this conversation as resolved.


def _send_deletion_notifications(config, age_in_days, as_user_account, mimetype='text/csv'):
"""
Send deletion notifications for files that are about to be deleted.

Args:
config (dict): Configuration dictionary
age_in_days (int): Days before files are deleted (retention period)
as_user_account (bool): Whether using OAuth2 user account authentication
mimetype (str): Mimetype of files to check. Defaults to 'text/csv'.
"""
LOG('Sending deletion notifications for files older than {} days'.format(age_in_days))

try:
drive = DriveApi(config['google_secrets_file'], as_user_account=as_user_account)
now = datetime.now(UTC)
delete_before_dt = now - timedelta(days=age_in_days)

external_emails = _get_external_emails_for_partners(drive, config)

platform_name = config['partner_report_platform_name']
file_prefix = '{}_{}'.format(REPORTING_FILENAME_PREFIX, platform_name)

for partner in config['partner_folder_mapping'].keys():
folder_id = config['partner_folder_mapping'][partner]

# Skip if no external POC (unless exempt)
if not external_emails.get(partner, []):
if partner not in config.get('exempted_partners', []):
LOG('WARNING: Partner "{}" has no POC for deletion notifications'.format(partner))
continue
Comment thread
ttak-apphelix marked this conversation as resolved.

try:
files = drive.walk_files(
folder_id,
file_fields='id, name, createdTime',
mimetype=mimetype,
recurse=False
)

files_to_notify = []

for file in files:
file_created = parse(file['createdTime'])
file_name = file.get('name', 'unknown')

if not file_name.startswith(file_prefix):
continue

if file_created < delete_before_dt:
file_id = file['id']

tag_string = ' '.join('+' + email for email in external_emails[partner])
comment_content = DELETION_NOTIFICATION_MESSAGE_TEMPLATE.format(
tags=tag_string,
filename=file_name
)

files_to_notify.append((file_id, comment_content))
LOG('File marked for deletion notification: {}'.format(file_name))

if files_to_notify:
drive.create_comments_for_files(files_to_notify)
LOG('Sent {} deletion notification(s) for partner "{}"'.format(len(files_to_notify), partner))

except Exception as exc: # pylint: disable=broad-except
LOG('WARNING: Error checking files for partner "{}": {}'.format(partner, exc))

except Exception as exc: # pylint: disable=broad-except
LOG('WARNING: Error in deletion notification check: {}. Continuing with deletion process.'.format(exc))


@click.command("delete_expired_reports")
@click.option(
'--config_file',
Expand Down Expand Up @@ -96,8 +255,7 @@ def _config_or_exit(config_file, google_secrets_file):
default=False,
help=(
'Feature flag to enable deletion notifications for GDPR partner reports. '
'Currently logs intent only; future implementation will send actual notifications. '
'See BOMS-398 for details.'
'When enabled, sends notifications to partners before files are deleted. '
),
show_default=True,
)
Expand All @@ -106,16 +264,12 @@ def delete_expired_reports(
):
"""
Performs the partner report deletion as needed.
Sends deletion notifications to users before files are deleted.
"""
LOG('Starting partner report deletion using config file "{}", Google config "{}", and {} days back'.format(
config_file, google_secrets_file, age_in_days
))

if enable_delete_notification:
LOG('Delete notification enabled - would send notifications for deleted reports')
else:
LOG('Delete notification disabled')

if not config_file:
FAIL(ERR_NO_CONFIG, 'No config file passed in.')

Expand All @@ -128,6 +282,13 @@ def delete_expired_reports(
config = _config_or_exit(config_file, google_secrets_file)

try:
if enable_delete_notification:
LOG('Delete notification enabled - sending notifications for deleted reports')
_config_drive_folder_map_or_exit(config, as_user_account)
_send_deletion_notifications(config, age_in_days, as_user_account, mimetype='text/csv')
else:
LOG('Delete notification disabled')

Comment thread
ttak-apphelix marked this conversation as resolved.
delete_before_dt = datetime.now(UTC) - timedelta(days=age_in_days)
drive = DriveApi(
config['google_secrets_file'], as_user_account=as_user_account
Expand Down
Loading
Loading