Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
337 changes: 323 additions & 14 deletions openedx/core/djangoapps/notifications/email/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db import transaction
from django.utils import timezone as django_timezone
from django.utils.translation import gettext as _, override as translation_override
from edx_ace import ace
from edx_ace.recipient import Recipient
Expand All @@ -17,6 +18,7 @@

from openedx.core.djangoapps.notifications.email_notifications import EmailCadence
from openedx.core.djangoapps.notifications.models import (
DigestSchedule,
Notification,
NotificationPreference,
)
Expand Down Expand Up @@ -114,27 +116,334 @@ def send_digest_email_to_user(
message = add_headers_to_email_message(message, message_context)
message.options['skip_disable_user_policy'] = True
ace.send(message)
notifications.update(email_sent_on=datetime.now())
notifications.update(email_sent_on=django_timezone.now())
send_user_email_digest_sent_event(user, cadence_type, notifications_list, message_context)
logger.info(f'<Email Cadence> Email sent to {user.username} ==Temp Log==')


@shared_task(ignore_result=True)
def get_next_digest_delivery_time(cadence_type):
"""
Calculate the next delivery time for a digest email based on cadence type.

Uses Django settings for configurable delivery time/day:
- NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR (default: 17)
- NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE (default: 0)
- NOTIFICATION_WEEKLY_DIGEST_DELIVERY_DAY (default: 0 = Monday)
- NOTIFICATION_WEEKLY_DIGEST_DELIVERY_HOUR (default: 17)
- NOTIFICATION_WEEKLY_DIGEST_DELIVERY_MINUTE (default: 0)

Returns:
datetime: The next scheduled delivery time in UTC.
"""
now = django_timezone.now()

if cadence_type == EmailCadence.DAILY:
delivery_hour = max(0, min(23, getattr(settings, 'NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR', 17)))
delivery_minute = max(0, min(59, getattr(settings, 'NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE', 0)))

# Calculate next delivery time
delivery_time = now.replace(
hour=delivery_hour,
minute=delivery_minute,
second=0,
microsecond=0
)
# If the delivery time has already passed today, schedule for tomorrow
if delivery_time <= now:
delivery_time += timedelta(days=1)

return delivery_time

elif cadence_type == EmailCadence.WEEKLY:
delivery_day = max(0, min(6, getattr(settings, 'NOTIFICATION_WEEKLY_DIGEST_DELIVERY_DAY', 0))) # 0=Monday
delivery_hour = max(0, min(23, getattr(settings, 'NOTIFICATION_WEEKLY_DIGEST_DELIVERY_HOUR', 17)))
delivery_minute = max(0, min(59, getattr(settings, 'NOTIFICATION_WEEKLY_DIGEST_DELIVERY_MINUTE', 0)))

# Calculate next delivery day
days_ahead = delivery_day - now.weekday()
if days_ahead < 0:
days_ahead += 7

delivery_time = now.replace(
hour=delivery_hour,
minute=delivery_minute,
second=0,
microsecond=0
) + timedelta(days=days_ahead)

# If the delivery time is today but has already passed, schedule for next week
if delivery_time <= now:
delivery_time += timedelta(days=7)

return delivery_time

raise ValueError(f"Invalid cadence_type for digest scheduling: {cadence_type}")


def get_digest_dedupe_key(user_id, cadence_type, delivery_time):
"""
Generate a deduplication key for a digest email task.

This key ensures that only one digest task is scheduled per user per cadence period.

Returns:
str: A unique key based on user_id, cadence_type, and delivery window.
"""
window_key = delivery_time.strftime('%Y-%m-%d-%H')
return f"digest:{user_id}:{cadence_type}:{window_key}"

Comment on lines +193 to +194

def is_digest_already_scheduled(user_id, cadence_type, delivery_time):
"""
Check if a digest email is already scheduled for this user in the current cadence window.

This prevents duplicate scheduling when multiple notifications arrive
in the same digest window.

Uses DigestSchedule model for an exact (user, cadence_type, delivery_time) lookup —
one record represents one pending Celery task. This is intentionally separate from
Notification.email_scheduled, which tracks the immediate/buffer cadence flow and
operates at the notification row level rather than the task level.
"""
if cadence_type not in [EmailCadence.DAILY, EmailCadence.WEEKLY]:
return False

return DigestSchedule.objects.filter(
user_id=user_id,
cadence_type=cadence_type,
delivery_time=delivery_time,
).exists()


def is_digest_already_sent_in_window(user_id, cadence_type, delivery_time):
"""
Check if a digest email has already been sent for this user in the current cadence window.

This prevents duplicate emails when both cron jobs and delayed tasks co-exist.
"""
if cadence_type == EmailCadence.DAILY:
window_start = delivery_time - timedelta(days=1)
elif cadence_type == EmailCadence.WEEKLY:
window_start = delivery_time - timedelta(days=7)
else:
return False

return Notification.objects.filter(
user_id=user_id,
email=True,
email_sent_on__gte=window_start,
email_sent_on__lte=delivery_time,
).exists()


def schedule_user_digest_email(user_id, cadence_type):
Comment on lines +227 to +239
"""
Schedule a delayed Celery task to send a digest email to a user.

This is called when a notification is created for a user who has
Daily or Weekly email cadence. It:
1. Calculates the next delivery time based on settings
2. Checks if a digest task is already scheduled for this window
3. Marks the notification as scheduled
4. Schedules a delayed Celery task with apply_async(eta=...)

The check-then-act logic is wrapped in a transaction to prevent
race conditions when multiple notifications arrive concurrently.

Args:
user_id: ID of the user to send digest to
cadence_type: EmailCadence.DAILY or EmailCadence.WEEKLY
"""
if not is_email_notification_flag_enabled():
return

if cadence_type not in [EmailCadence.DAILY, EmailCadence.WEEKLY]:
logger.warning(f'<Digest Schedule> Invalid cadence_type {cadence_type} for user {user_id}')
return
Comment on lines +260 to +262

delivery_time = get_next_digest_delivery_time(cadence_type)


with transaction.atomic():

task_id = get_digest_dedupe_key(user_id, cadence_type, delivery_time)
_schedule, created = DigestSchedule.objects.get_or_create(
user_id=user_id,
cadence_type=cadence_type,
delivery_time=delivery_time,
defaults={'task_id': task_id},
)

if not created:
# Another worker already scheduled this window.
logger.info(
f'<Digest Schedule> Digest already scheduled for user {user_id}, '
f'cadence={cadence_type}, delivery_time={delivery_time}'
)
return

if is_digest_already_sent_in_window(user_id, cadence_type, delivery_time):
logger.info(
f'<Digest Schedule> Digest already sent for user {user_id} in this window, '
f'cadence={cadence_type}, delivery_time={delivery_time}'
)
# Remove the record we just created — no task needed.
_schedule.delete()
return

# Mark unscheduled notifications for this user as scheduled.

if cadence_type == EmailCadence.DAILY:
window_start = delivery_time - timedelta(days=1)
else:
window_start = delivery_time - timedelta(days=7)

updated = Notification.objects.filter(
user_id=user_id,
email=True,
email_scheduled=False,
email_sent_on__isnull=True,
created__gte=window_start,
).update(email_scheduled=True)

if updated == 0:
logger.info(
f'<Digest Schedule> No unsent notifications to schedule for user {user_id}'
)
# Remove the record — nothing to deliver.
_schedule.delete()
return

# Schedule the delayed celery task
send_user_digest_email_task.apply_async(
kwargs={
'user_id': user_id,
'cadence_type': cadence_type,
},
eta=delivery_time,
task_id=task_id,
)

logger.info(
f'<Digest Schedule> Scheduled {cadence_type} digest for user {user_id} '
f'at {delivery_time} (task_id={task_id})'
)


@shared_task(bind=True, ignore_result=True, max_retries=3, default_retry_delay=300)
Comment on lines +320 to +333
@set_code_owner_attribute
def send_digest_email_to_all_users(cadence_type):
def send_user_digest_email_task(self, user_id, cadence_type):
"""
Send email digest to all eligible users
Delayed Celery task to send a digest email to a single user.

This task is scheduled with apply_async(eta=...) for the configured
delivery time. When it fires:
Comment on lines 336 to +340
1. Checks if email was already sent (by cron job) to avoid duplicates
2. Gathers all unsent notifications for the cadence window
3. Sends the digest email
4. Marks notifications as sent
"""
logger.info(f'<Email Cadence> Sending cadence email of type {cadence_type}')
users = get_audience_for_cadence_email(cadence_type)
language_prefs = get_language_preference_for_users([user.id for user in users])
courses_data = {}
start_date, end_date = get_start_end_date(cadence_type)
logger.info(f'<Email Cadence> Email Cadence Audience {len(users)}')
for user in users:
user_language = language_prefs.get(user.id, 'en')
send_digest_email_to_user(user, cadence_type, start_date, end_date, user_language=user_language,
courses_data=courses_data)
try:
if not ENABLE_EMAIL_NOTIFICATIONS.is_enabled():
logger.info(f'<Digest Task> Email notifications disabled, skipping user {user_id}')
return

user = User.objects.get(id=user_id)

if not user.has_usable_password():
Comment on lines +350 to +353
logger.info(f'<Digest Task> User {user.username} is disabled, skipping')
_cleanup_digest_schedule_for_current_window(user_id, cadence_type)
return

if not is_email_notification_flag_enabled(user):
logger.info(f'<Digest Task> Email flag disabled for user {user.username}')
_cleanup_digest_schedule_for_current_window(user_id, cadence_type)
return

start_date, end_date = get_start_end_date(cadence_type)

already_sent = Notification.objects.filter(
user_id=user_id,
email=True,
email_sent_on__gte=start_date,
email_sent_on__lte=end_date,
).exists()

if already_sent:
logger.info(
f'<Digest Task> Digest already sent for user {user.username} '
f'in window {start_date} to {end_date}. Clearing scheduled flags.'
)
# Clear scheduled flags so they're not picked up again
Notification.objects.filter(
user_id=user_id,
email=True,
email_scheduled=True,
created__gte=start_date,
created__lte=end_date,
).update(email_scheduled=False)
_cleanup_digest_schedule_for_current_window(user_id, cadence_type)
return

language_prefs = get_language_preference_for_users([user_id])
user_language = language_prefs.get(user_id, 'en')
courses_data = {}

send_digest_email_to_user(
user, cadence_type, start_date, end_date,
user_language=user_language,
courses_data=courses_data
)

# Clear scheduled flags after successful send
Notification.objects.filter(
user_id=user_id,
email=True,
email_scheduled=True,
created__gte=start_date,
created__lte=end_date,
).update(email_scheduled=False)

# Remove only the current window's DigestSchedule record — future
# windows that may have been scheduled concurrently must be preserved.
_cleanup_digest_schedule_for_current_window(user_id, cadence_type)

logger.info(f'<Digest Task> Successfully sent {cadence_type} digest to user {user.username}')

except User.DoesNotExist:
logger.error(f'<Digest Task> User {user_id} not found')
# Clean up the orphaned DigestSchedule so future windows are not blocked.
_cleanup_digest_schedule_for_current_window(user_id, cadence_type)

except Exception as exc:
logger.exception(f'<Digest Task> Failed to send digest to user {user_id}: {exc}')
retry_countdown = 300 * (2 ** self.request.retries)
raise self.retry(exc=exc, countdown=retry_countdown)


def _cleanup_digest_schedule_for_current_window(user_id, cadence_type):
"""
Remove DigestSchedule records only for the current delivery window.
Comment on lines +423 to +426

This ensures that a future window's DigestSchedule (created when a new
notification arrives after the current task was scheduled) is preserved.
"""
now = django_timezone.now()

if cadence_type == EmailCadence.DAILY:
# The current window's delivery_time is at most 1 day + buffer in the past
window_cutoff = now - timedelta(days=1, hours=1)
elif cadence_type == EmailCadence.WEEKLY:
window_cutoff = now - timedelta(days=7, hours=1)
else:
return

DigestSchedule.objects.filter(
user_id=user_id,
cadence_type=cadence_type,
delivery_time__lte=now,
delivery_time__gte=window_cutoff,
).delete()


def send_immediate_cadence_email(email_notification_mapping, course_key):
Expand Down
Loading
Loading